-
Notifications
You must be signed in to change notification settings - Fork 0
/
PageRank.cpp
89 lines (79 loc) · 2.04 KB
/
PageRank.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
#include "Vertex.h"
#include "Comper.h"
#include "Runner.h"
#include "Combiner.h"
#define PAGERANK_ROUND 10
class PRCombiner:public Combiner<double>
{
public:
virtual void combine(double& old, const double& new_msg)
{
old += new_msg;
}
};
//class PRVertex:public Vertex<VertexID, double, VertexID, double> //no combiner
class PRVertex:public Vertex<VertexID, double, VertexID, double, PRCombiner> //with combiner
{
public:
virtual void compute(vector<double>& msgs, vector<VertexID>& edges)
{
if(step_num()==1)
{
value = 1.0;
}
else
{
double sum = 0;
for(int i=0; i<msgs.size(); i++) sum += msgs[i];
value = 0.15 + 0.85*sum;
}
if(step_num() < PAGERANK_ROUND)
{
double msg = value / degree; //value is double, so division gives double
for(int i=0; i<edges.size(); i++) send_message(edges[i], msg);
}
else vote_to_halt();
}
};
class PRComper:public Comper<PRVertex>
{
char buf[100];
public:
virtual VertexID parseVertex(char* line, obinstream& file_stream)
{
char * pch = strtok(line, "\t");
VertexID id = atoi(pch);
file_stream << id; //write <I>
file_stream << 1.0; //write <V>, init Pr = 1.0 (cannot use 1 as it is not double)
file_stream << true; //write <active>
pch=strtok(NULL, " ");
int num=atoi(pch);
file_stream << num; //write numNbs
for(int i=0; i<num; i++)
{
pch=strtok(NULL, " ");
VertexID nb = atoi(pch);
file_stream << nb; //write <E>
}
return id;
}
virtual void to_line(PRVertex& v, ofstream& fout)
{
fout<<v.id<<'\t'<<v.value<<endl; //report: vid \t pagerank
}
virtual void to_line(PRVertex& v, BufferedWriter& fout)
{
sprintf(buf, "%d\t%lf\n", v.id, v.value);
fout.write(buf);
}
};
int main(int argc, char* argv[])
{
Runner<PRVertex, PRComper> runner;
string hdfs_inpath = argv[1];
string hdfs_outpath = argv[2];
string local_root = "/home/yanda/tmp/iopregel2";
bool dump_with_edges = false;
runner.runHH(hdfs_inpath, hdfs_outpath, local_root, dump_with_edges, argc, argv); //HDFS Load, HDFS Dump
return 0;
}