-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpfs_fileserver.cpp
184 lines (136 loc) · 5.38 KB
/
pfs_fileserver.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
#include "pfs_fileserver.hpp"
#include <grpcpp/grpcpp.h>
#include <iostream>
#include <fstream>
#include <string>
#include <glob.h>
using grpc::Server;
using grpc::ServerBuilder;
using grpc::ServerContext;
using grpc::Status;
using grpc::Channel;
using grpc::ClientContext;
using grpc::Status;
using google::protobuf::Empty;
using namespace std;
string getFilePath(int fd, ll blocknum) {
string filename = "/scratch/" + to_string(fd) + "-" + to_string(blocknum);
return filename;
}
string getFilePathPattern(int fd) {
return "/scratch/" + to_string(fd) + "-*";
}
void deleteAllBlocks(int fd) {
const char* pattern = getFilePathPattern(fd).c_str();
// Initialize a glob structure
glob_t glob_result;
// Perform the globbing
if (glob(pattern, GLOB_MARK | GLOB_NOSORT, nullptr, &glob_result) == 0) {
// Iterate through the matching files and delete them
for (size_t i = 0; i < glob_result.gl_pathc; ++i) {
const char* filename = glob_result.gl_pathv[i];
// Attempt to delete the file
if (std::remove(filename) != 0) {
//perror("Error deleting file");
} else {
//std::cout << "File deleted: " << filename << std::endl;
}
}
// Free the memory allocated by glob
globfree(&glob_result);
} else {
//std::cerr << "Error in globbing" << std::endl;
}
}
class ServerImpl final : public FileService::Service {
public:
Status isConnected(ServerContext* context, const Dummy* request, Empty* response) override {
//std::cout << "Received request: " << request->DebugString() << std::endl;
return Status::OK;
}
Status deleteFd(ServerContext* context, const File* request, Empty* response) override {
//std::cout << "Received request: " << request->DebugString() << std::endl;
int fd = request->fd();
deleteAllBlocks(fd);
return Status::OK;
}
Status readData(ServerContext* context, const Blockinfo* request, Data* response) override {
//std::cout << "Received request: " << request->DebugString() << std::endl;
int fd = request->fd();
ll blocknum = request->blocknum();
string filePath = getFilePath(fd, blocknum);
std::ifstream inFile(filePath, std::ios::binary);
if (inFile.is_open()) {
// Get the size of the file
inFile.seekg(0, std::ios::end);
std::streampos fileSize = inFile.tellg();
inFile.seekg(0, std::ios::beg);
// Read the binary data into a string
std::string binaryData(fileSize, '\0');
inFile.read(&binaryData[0], fileSize);
//cout<<"FS:" <<binaryData<<endl;
inFile.close();
response->set_data(binaryData);
} else {
string s;
response->set_data(s);
}
response->set_fd(fd); response->set_blocknum(blocknum);
return Status::OK;
}
Status writeData(ServerContext* context, const Data* request, Empty* response) override {
//std::cout << "Received request: " << request->DebugString() << std::endl;
int fd = request->fd();
ll blocknum = request->blocknum();
string filePath = getFilePath(fd, blocknum);
int fileDescriptor = open(filePath.c_str(), O_RDWR | O_CREAT, 0600);
if (fileDescriptor == -1) {
return Status::CANCELLED;
}
string data = request->data();
//cout<<"FS got" << data <<endl;
pwrite(fileDescriptor, data.c_str(), PFS_BLOCK_SIZE, 0);
close(fileDescriptor);
return Status::OK;
}
Status writeDataSpecific(ServerContext* context, const DataSpecific* request, Empty* response) override {
//std::cout << "Received request: " << request->DebugString() << std::endl;
int fd = request->fd();
ll blocknum = request->blocknum();
string filePath = getFilePath(fd, blocknum);
int fileDescriptor = open(filePath.c_str(), O_RDWR | O_CREAT, 0600);
if (fileDescriptor == -1) {
return Status::CANCELLED;
}
string data = request->data();
//cout<<"FS got" << data <<endl;
pwrite(fileDescriptor, data.c_str(), request->size(), request->startoffset());
close(fileDescriptor);
return Status::OK;
}
};
void RunServer(string server_address) {
ServerImpl service;
ServerBuilder builder;
builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
builder.RegisterService(&service);
std::unique_ptr<Server> server(builder.BuildAndStart());
std::cout << "File Server listening on " << server_address << std::endl;
server->Wait();
}
int main(int argc, char *argv[])
{
string hostname = getMyIP().c_str(); // it needs to be changed later on when we put IP address in pfs_file.txt
printf("%s:%s: Start! Hostname: %s, IP: %s\n", __FILE__, __func__, getMyHostname().c_str(), getMyIP().c_str());
// Run the PFS fileserver and listen to requests
// Check if my IP is in pfs_list.txt
if (!isFileServer(hostname)) {
printf("I am not listed as FileServer. exiting\n");
return 0;
}
string serverAddress = getMyPort(hostname);
cout<< "File server " << serverAddress << " Up!" <<endl;
RunServer(serverAddress);
printf("%s:%s: Finish!\n", __FILE__, __func__);
return 0;
}