-
Notifications
You must be signed in to change notification settings - Fork 3
/
main_count_distribution.cpp
117 lines (93 loc) · 3.15 KB
/
main_count_distribution.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
#include <iostream>
#include <queue>
#include <vector>
#include <tuple>
#include <defs.h>
#include <Utils/ThrillBinaryReader.h>
struct pq_item {
edge_t edge;
size_t queue;
pq_item(const edge_t edge = edge_t{0,0}, const size_t queue = 0)
: edge(edge), queue(queue)
{}
bool operator<(const pq_item& other) const {
return std::tie(edge, queue) > std::tie(other.edge, other.queue);
}
};
int main(int argc, char* argv[]) {
std::vector<ThrillBinaryReader> readers(argc - 1);
std::vector<node_t> counts(argc, 0);
size_t reader_active = readers.size();
std::priority_queue<pq_item> buffer;
auto insert_to_buffer = [&] (size_t idx) {
auto& reader = readers.at(idx);
if (reader.empty()) {
reader_active--;
std::cout << "Consumed reader " << idx << " after " << reader.edges_read() << " edges\n";
return;
}
buffer.emplace(*reader, idx);
++reader;
};
auto fetch_and_reload = [&] () -> edge_t {
pq_item result = buffer.top();
buffer.pop();
insert_to_buffer(result.queue);
return result.edge;
};
// open file and fill buffer
for(size_t i=1; i< static_cast<size_t>(argc); i++) {
auto& reader = readers.at(i - 1);
reader.open(argv[i]);
if (reader.empty()) {
std::cout << "File " << argv[i] << " seems empty" << std::endl;
}
insert_to_buffer(i - 1);
}
// consume readers
if (!buffer.empty()) {
edge_t last_edge = fetch_and_reload();
uint_t multiplicity = 1;
while(!buffer.empty()) {
const edge_t current_edge = fetch_and_reload();
if (current_edge != last_edge) {
counts.at(multiplicity)++;
multiplicity = 0;
last_edge = current_edge;
}
++multiplicity;
}
counts.at(multiplicity)++;
}
// output result
edgeid_t edges = 0;
edgeid_t unique_edges = 0;
for(size_t i=0; i<counts.size(); ++i) {
std::cout << i << "\t" << counts[i] << " # DISTR\n";
edges += i * counts[i];
unique_edges += counts[i];
}
std::cout << "Edges found: " << edges << "\n";
std::cout << "Unique edges: " << unique_edges << " ("
<< (1.0 * unique_edges / readers.size() / readers[0].edges_read()) << ")\n";
// Check every reader contributed the same number of edges;
{
auto dump_edges_per_reader = [&] () {
for(unsigned int i=0; i < readers.size(); ++i)
std::cout << " " << i << "\tEdges: " << readers[i].edges_read() << "\t File: " << argv[i+1];
};
for(unsigned int i=1; i < readers.size(); ++i) {
if (readers[i].edges_read() != readers[0].edges_read()) {
dump_edges_per_reader();
return -1;
}
}
const edgeid_t expected = readers.size() * readers[0].edges_read();
if (edges != expected) {
std::cout << "Edge mismatch -- expected " << expected << " counts\n";
return -1;
}
}
std::cout << "Done.\n";
return 0;
}