1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
|
#include <map>
#include <array>
#include <iostream>
#include <boost/iostreams/device/file_descriptor.hpp>
#include <boost/iostreams/stream_buffer.hpp>
#include <boost/iostreams/stream.hpp>
#include <queue>
using namespace std;
using boost::iostreams::file_descriptor_source;
using boost::iostreams::stream_buffer;
using boost::iostreams::stream;
using boost::iostreams::file_descriptor_flags;
typedef stream<file_descriptor_source> Src;
typedef tuple<Src*, int, string, string> PqP;
struct Cmp {
bool operator() (PqP &x1, PqP &x2) {
if (get<2>(x1) > get<2>(x2)) return true;
if (get<2>(x1) < get<2>(x2)) return false;
return get<1>(x1) > get<1>(x2);
}
};
int main(int argc, char **argv) {
// the fds we use as inputs; externally supplied
auto fds = {3, 4};
// in-line delimiters; first one is used for output
array<char, 3> delim = {'\t', ' ', 0};
// default cell value if none is supplied on a line
string cellDef = "";
if (argc == 2)
cellDef = argv[1];
priority_queue<PqP, vector<PqP>, Cmp> queue;
auto tryRead = [&](Src *src, int fd) {
array<char, 1000> str;
src->getline(str.data(), str.size());
if (*src) {
char *delimPos = find_first_of(str.begin(), str.end(), delim.begin(), delim.end());
queue.emplace(src, fd,
string(str.data(), delimPos - str.data()),
string(*delimPos ? (delimPos+1) : delimPos));
}else{
delete src;
}
};
// init queue with new streams
for (auto fd : fds)
tryRead(new Src(file_descriptor_source(fd, boost::iostreams::close_handle)), fd);
// iterate over prio queue to print joined strings
string curKey("");
map<int, string> curVals;
auto maybeFlushLine = [&]() {
if (curVals.empty())
return;
cout << curKey;
for (auto fd : fds) {
cout << delim[0];
if (curVals.count(fd)) {
cout << curVals[fd];
}else{
cout << cellDef;
}
}
cout << endl;
};
while (!queue.empty()) {
PqP cur = queue.top();
queue.pop();
// start next line if we see a new key the first time
if (get<2>(cur) != curKey) {
maybeFlushLine();
curKey = get<2>(cur);
curVals.clear();
}
// add value to line buffer
auto fd = get<1>(cur);
assert(!curVals.count(fd));
curVals[fd] = get<3>(cur);
// read next element from the file we just processed a line from
tryRead(get<0>(cur), get<1>(cur));
}
maybeFlushLine();
return EXIT_SUCCESS;
}
|