-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathkvstore.cc
350 lines (326 loc) · 8.41 KB
/
kvstore.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
#include "kvstore.h"
KVStore::KVStore(const std::string &dir) : KVStoreAPI(dir)
{
root = dir;
memTable = new Skiplist();
index = new Index();
buffer = new Buffer();
maxTimeStamp = 0;
initialize();
}
KVStore::~KVStore()
{
saveToDisk();
delete memTable;
delete index;
delete buffer;
}
void KVStore::initialize()
{
std::string dirPath;
int level;
for (level = 0, dirPath = getLevelPath(level); utils::dirExists(dirPath); dirPath = getLevelPath(++level))
{
levelFilesNum.push_back(0);
int i = 0;
while (true)
{
std::string filePath = getSSTablePath(level, i);
fstream *in = new fstream(filePath.c_str(), ios::binary | ios::in);
if (!in->is_open())
{
in->close();
delete in;
break;
}
levelFilesNum[level]++;
SSTable *tmp = index->readFile(level, i, in);
if (tmp->getTimeStamp() >= maxTimeStamp)
{
maxTimeStamp = tmp->getTimeStamp() + 1;
}
i += 1;
in->close();
delete in;
}
}
}
/**
* Insert/Update the key-value pair.
* No return values for simplicity.
*/
void KVStore::put(uint64_t key, const std::string &s)
{
if (memTableSize() + 8 + 4 + s.size() > 2 * 1024 * 1024) //如果插入后的大小大于2M,先把memTable写入SSTable(8和4分别是key和offset的大小)
{
saveToDisk();
if (levelFilesNum[0] >= 3)
compact(0);
}
memTable->insert(key, s);
}
/**
* Returns the (string) value of the given key.
* An empty string indicates not found.
*/
std::string KVStore::get(uint64_t key)
{
std::string result = "";
//在memTable中找
result = memTable->getValue(key);
if (result == "~DELETED~")
{
return "";
}
if (result != "")
return result;
//在SSTable中找
result = findInSSTable(key);
return result;
}
/**
* Delete the given key-value pair if it exists.
* Returns false iff the key is not found.
*/
bool KVStore::del(uint64_t key)
{
std::string value;
value = memTable->getValue(key);
if (value == "~DELETED~")
{
return false;
}
if (value != "")
{
memTable->remove(key);
put(key, "~DELETED~");
return true;
}
value = findInSSTable(key);
if (value == "~DELETED~" || value == "")
{
return false;
}
put(key, "~DELETED~");
return true;
}
/**
* This resets the kvstore. All key-value pairs should be removed,
* including memtable and all sstables files.
*/
void KVStore::reset()
{
memTable->clear();
index->clear();
std::string dirPath;
int level;
for (level = 0, dirPath = getLevelPath(level); utils::dirExists(dirPath); dirPath = getLevelPath(++level))
{
std::vector<string> files;
utils::scanDir(dirPath, files);
for (std::vector<string>::iterator it = files.begin(); it != files.end(); ++it)
{
std::string filePath = dirPath + *it;
utils::rmfile(filePath.c_str());
}
utils::rmdir(dirPath.c_str());
}
}
int KVStore::memTableSize()
{
int infoSize = 32 + 10240; //Header加BloomFilter的大小(Header为32Byte,BF为10240Byte)
int indexSize = memTable->size() * (8 + 4); //索引区的大小(key为8Byte, offset为4Byte)
int dataSize = memTable->dataSize(); //数据区的大小
return infoSize + indexSize + dataSize;
}
void KVStore::saveToDisk()
{
std::string foldPath, SSTablePath;
foldPath = generateLevel(0); //生成第0层(内部自带判断是否已经存在某一层,若存在则不生成)
SSTablePath = getSSTablePath(0, levelFilesNum[0]);
levelFilesNum[0]++;
fstream out(SSTablePath.c_str(), ios::binary | ios::out);
//写入时间戳
out.write((char *)&maxTimeStamp, sizeof(maxTimeStamp));
maxTimeStamp++;
//写入键值对数量
uint64_t size = memTable->size();
out.write((char *)&size, sizeof(size));
//写入键最小值和最大值
uint64_t minKey = memTable->getMinKey();
uint64_t maxKey = memTable->getMaxKey();
out.write((char *)&minKey, sizeof(minKey));
out.write((char *)&maxKey, sizeof(maxKey));
//写入BloomFilter
vector<bool> BF = memTable->genBFVector();
for (size_t i = 0; i < BF.size(); i += 8)
{
char b = 0; //每次一个byte一个byte的写入文件
for (int j = 0; j < 8; ++j)
{
if (BF[i + j])
{
b = b | (1 << (7 - j)); //设置byte的位
}
}
out.write(&b, sizeof(b));
}
//写入索引区
uint32_t offset = 32 + 10240 + size * (8 + 4);
Node *p = memTable->getLowestHead();
p = p->right;
while (p)
{
out.write((char *)&(p->key), sizeof(p->key));
out.write((char *)&offset, sizeof(offset));
offset += p->val.size() + 1;
p = p->right;
}
//写入数据区
p = memTable->getLowestHead();
p = p->right;
while (p)
{
out.write((char *)(p->val.c_str()), p->val.size() + 1);
p = p->right;
}
//关闭文件
out.close();
//读取到index
fstream *in = new fstream(SSTablePath.c_str(), ios::binary | ios::in);
index->readFile(0, levelFilesNum[0] - 1, in);
in->close();
delete in;
memTable->clear(); //写入完成后,应清空memTable
}
std::string KVStore::getLevelPath(int level)
{
std::string path = root + "/level-" + to_string(level) + "/";
return path;
}
std::string KVStore::generateLevel(int level)
{
std::string path = getLevelPath(level);
if (!utils::dirExists(path))
{
utils::mkdir(path.c_str());
levelFilesNum.push_back(0); //新的一层内初始有0个文件
}
return path;
}
std::string KVStore::getSSTablePath(int level, int i)
{
std::string path = getLevelPath(level);
path += "/SSTable" + to_string(i) + ".sst";
return path;
}
std::string KVStore::findInSSTable(uint64_t key)
{
std::string result;
char buf[200000] = {0};
uint32_t offset = 0;
SSTable *SSTp = index->search(key, offset);
if (!SSTp)
return "";
fstream in(getSSTablePath(SSTp->getLevel(), SSTp->getId()), ios::binary | ios::in);
in.seekg(offset, ios::beg);
in.get(buf, 200000, '\0');
in.close();
result = buf;
if (result == "~DELETED~")
return "";
return result;
}
void KVStore::compact(int level)
{
buffer->clear();
string SSTablePath;
int limit = 0;
if (level == 0)
limit = 0; //若是第0层,从第0个SSTable开始合并
else
limit = 1 << (level + 1);
int oldLevelFilesNum = levelFilesNum[level];
//把该层要合并的SSTable读入buffer,并删除文件
for (int i = limit; i < oldLevelFilesNum; i++)
{
SSTablePath = getSSTablePath(level, i);
fstream *in = new fstream(SSTablePath.c_str(), ios::binary | ios::in);
buffer->readFile(in);
in->close();
delete in;
utils::rmfile(SSTablePath.c_str());
index->deleteFileIndex(level, i);
levelFilesNum[level]--;
}
int nextLevel = level + 1;
vector<int> interSSTable; //下一层键值有交集的SSTable的id
if (levelFilesNum.size() == nextLevel) //下一层为空
{
buffer->compact(true); //合并时需要删除~DELETED~
generateLevel(nextLevel); //创建下一层文件夹
}
else
{
oldLevelFilesNum = levelFilesNum[nextLevel];
int minKey = buffer->getMinKey();
int maxKey = buffer->getMaxKey();
interSSTable = index->findIntersectionId(nextLevel, minKey, maxKey);
for (size_t i = 0; i < interSSTable.size(); i++) //把下层需要合并的读进去,并删除文件
{
SSTablePath = getSSTablePath(nextLevel, interSSTable[i]);
fstream *in = new fstream(SSTablePath.c_str(), ios::binary | ios::in);
buffer->readFile(in);
in->close();
delete in;
utils::rmfile(SSTablePath.c_str());
levelFilesNum[nextLevel]--;
}
for (int i = 0, k = 0; i < oldLevelFilesNum; i++) //重命名下层文件名
{
if (SSTableFileExists(nextLevel, i))
{
rename(getSSTablePath(nextLevel, i).c_str(), getSSTablePath(nextLevel, k).c_str());
++k;
}
}
buffer->compact(false);
}
while (!buffer->isOutputEmpty())
{
if (SSTableFileExists(nextLevel, 0)) //在开头插入,已有的文件名依次往后挪一个
{
for (int i = levelFilesNum[nextLevel] - 1; i >= 0; --i)
{
rename(getSSTablePath(nextLevel, i).c_str(), getSSTablePath(nextLevel, i + 1).c_str());
index->changeIndex(nextLevel, i, i + 1);
}
}
SSTablePath = getSSTablePath(nextLevel, 0);
levelFilesNum[nextLevel]++;
fstream *out = new fstream(SSTablePath.c_str(), ios::binary | ios::out);
buffer->write(out);
out->close();
delete out;
//读进index
fstream *in = new fstream(SSTablePath.c_str(), ios::binary | ios::in);
index->readFile(nextLevel, 0, in);
in->close();
delete in;
}
if (levelFilesNum[nextLevel] > (1 << (nextLevel + 1)))
compact(nextLevel);
}
bool KVStore::SSTableFileExists(int level, int id)
{
bool result = true;
string SSTablePath = getSSTablePath(level, id);
fstream *in = new fstream(SSTablePath.c_str(), ios::binary | ios::in);
if (!in->is_open())
{
result = false;
}
in->close();
delete in;
return result;
}