-
Notifications
You must be signed in to change notification settings - Fork 0
/
merger.c
155 lines (126 loc) · 3.61 KB
/
merger.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
#include <stdio.h>
#include <stdint.h>
#include <stdlib.h>
#include <assert.h>
#include <unistd.h>
#include <string.h>
#include <glob.h>
#include <getopt.h>
typedef struct {
FILE *in;
uint64_t entry;
} ifile_t;
void ifile_init( ifile_t *file ) {
memset( file, 0, sizeof(*file));
}
// Read next tuple (term, doc) from the index.
int ifile_read( ifile_t *file ) {
return fread( &file->entry, sizeof(file->entry), 1, file->in ) != 1;
}
// Close file, release memory
void ifile_close( ifile_t *file ) {
fclose( file->in );
}
void ifile_write( ifile_t *file, uint64_t entry ) {
fwrite( &entry, sizeof(entry), 1, file->in );
}
// Write what we have in our buffer to disk
void ifile_flush( ifile_t *file ) {
fflush(file->in);
}
static inline int compare_ifile( const void *va, const void *vb ) {
const ifile_t *a = *(ifile_t **)va;
const ifile_t *b = *(ifile_t **)vb;
return (a->entry - b->entry);
}
// Merge multiple sorted ifiles. This is a O(N*M) operation where M is the numer of
// files.
static void merge( ifile_t **files, size_t nfiles, ifile_t *outs ) {
size_t k;
ifile_t *temp;
// Read in initial tuple for each ifile
for( k=0; k<nfiles; k++ ) {
if( ifile_read( files[k] ) ) {
// EOF remove file
ifile_close( files[k] );
memmove( &files[k], &files[k+1], nfiles-k-1*sizeof(ifile_t*));
nfiles--;
}
}
qsort( files, nfiles, sizeof(ifile_t *), compare_ifile );
while( nfiles ) {
ifile_t *f = files[0];
// Write lowest tuple to output
ifile_write( outs, f->entry );
// Read next tuple for this file
if( ifile_read( f ) ) {
// EOF case
temp = f;
// Move above it down one in the array [the delete]
memmove( &files[0], &files[1], (nfiles-1)*sizeof(ifile_t*));
nfiles--;
// Copy the dude we just deleted to the end of the array
files[nfiles] = temp;
ifile_close( temp );
} else if( nfiles == 1 ) {
continue;
} else {
// Binary search to see where this file should be inserted as it's tupe changed
size_t lo = 1;
size_t hi = nfiles;
size_t probe = lo;
size_t count_of_smaller_lines;
while (lo < hi) {
int cmp = compare_ifile( files[0], files[probe]);
if (cmp < 0 || cmp == 0 ) {
hi = probe;
} else {
lo = probe + 1;
}
probe = (lo + hi) / 2;
}
count_of_smaller_lines = lo - 1;
// Preserve the one we are moving
temp = files[0];
// Copy everything down up to the point of insertion
memmove( &files[0], &files[1], count_of_smaller_lines*sizeof(ifile_t*));
// Insert or guy
files[count_of_smaller_lines] = temp;
}
}
ifile_flush( outs );
}
// Read in as many files output by the con utility and merge them. The con
// utility sorts (term,doc_id) pairs so the merge utility does a N-way
// merge on the sorted files and outputs them to a new file.. We
// will likely allow the merger utility to dump the sorted data to stdout
// so we can stream it to a remote object store easily since the final files
// will be huge
int main( int argc, char **argv) {
glob_t bglob = {0};
char *pattern = NULL;
int c;
while((c=getopt(argc, argv, "p:")) != -1 ) {
switch(c) {
case 'p':
pattern = strdup(optarg);
break;
}
}
if( !pattern ) {
fprintf( stderr, "Must specify wildcard pattern for files\n");
return EXIT_FAILURE;
}
glob( pattern, 0, NULL, &bglob );
ifile_t **pfiles = malloc( bglob.gl_pathc* sizeof(ifile_t*));
int count=0;
for( size_t k=0; k<bglob.gl_pathc; k++, count++ ) {
pfiles[k] = malloc(sizeof(ifile_t));
ifile_init( pfiles[k] );
pfiles[k]->in = fopen( bglob.gl_pathv[k], "rb");
}
ifile_t out;
ifile_init(&out);
out.in = fopen("output", "wb");
merge( pfiles, count, &out);
}