-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathhelperFunctions.cpp
341 lines (263 loc) · 10.2 KB
/
helperFunctions.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
#include "helperFunctions.h"
// error handler that prints a message and exits the process
int errorcheck(int i, const char * message){
// if the return value is -1
if (i ==-1){
// print where the message was encountered and the Errno
std::cerr << "Error Encountered with: " << message << std::endl;
std::cerr << "Errno: " << errno << std::endl;
_exit(1);
}
// else return the return value of the call
return i;
}
// bind docket to an address
int bind_port(int socketFD, short port){
// to pass the struct in bind
struct sockaddr_in server;
// address family is AF INET in the Internet domain
server.sin_family = AF_INET;
// address can be a specific IP or INADDR ANY (special address (0.0.0.0) meaning “any address”)
server.sin_addr.s_addr = htonl(INADDR_ANY);
// TCP port number
server.sin_port = htons(port);
// binding socket socketFD to address
return errorcheck(bind(socketFD, (SA *) &server, sizeof(server)), "binding socket");
}
// handles the client request for the communication thread
void* get_client_request(void* conFD){
int connectFD = *((int*)conFD);
delete (int*)conFD;
int buffer=4096;
// to read and write
size_t bytes_read;
char buff[buffer];
memset(buff, '\0', sizeof(char)*buffer);
string dirpath = "";
// read folder that was given
while((bytes_read = read(connectFD, &buff, sizeof(char)*buffer))>0){
dirpath +=buff;
memset(buff, '\0', sizeof(char)*buffer);
if (bytes_read < buffer) break;
}
// read error handling
errorcheck(bytes_read, "read from socket");
// confirmation message
std::cout << "[Thread: "<< std::hash<std::thread::id>()(std::this_thread::get_id()) << "]: About to scan directory "<< dirpath << std::endl;
// how many files are needed to be processed
count_files(dirpath, connectFD);
// block size that server uses
uint32_t server_block_size = htonl(block_size);
write(connectFD, &server_block_size, sizeof(uint32_t));
// pass the file count to the client
int counter= unsatisfied_clients[connectFD];
uint32_t count = htonl(counter);
write(connectFD, &count, sizeof(uint32_t));
// katalog recursively finds and adds the files in the queue
katalog(dirpath, connectFD);
return NULL;
}
// recursively counts the files in the folder and all subfolders
void count_files(string dirpath, int connectFD){
// manipulate the string to use it as a directory
if(dirpath.back()=='\0'){
dirpath.pop_back();
}
if(dirpath.back()!='/'){
dirpath+="/";
}
string pathie = dirpath;
dirpath+='\0';
char directory[dirpath.length()];
strcpy(directory, dirpath.c_str());
DIR *dir;
struct dirent *ent;
if ((dir = opendir(directory)) != NULL) {
// for each directory and file
while ((ent = readdir(dir)) != NULL) {
string pathfile = pathie;
pathfile += ent->d_name;
pathfile += '\0';
char pathname[pathfile.length()];
strcpy(pathname, pathfile.c_str());
// if it is not current or parent directory
if (!( !strcmp(ent->d_name, ".") || !strcmp(ent->d_name, "..") )){
struct stat pathINFO;
stat(pathname, &pathINFO);
if( S_ISREG(pathINFO.st_mode)){
pthread_mutex_lock(&uc_mutex);
// if it is not in the map add it
if(unsatisfied_clients.find(connectFD)==unsatisfied_clients.end()){
unsatisfied_clients.insert(std::pair<int, int>(connectFD, 1));
}
else{ // else increase the counter
unsatisfied_clients[connectFD]+=1;
}
pthread_mutex_unlock(&uc_mutex);
}
else if( S_ISDIR(pathINFO.st_mode)){
// if its a directory call recursively count_files
pathfile.pop_back();
count_files(pathfile, connectFD);
}
}
}
closedir(dir);
}
else {
// if it hasn't been found
errorcheck(-1, "opening directory");
}
}
// katalog recursively finds and adds the files in the queue
void katalog(string dirpath, int connectFD){
// manipulate the string
if(dirpath.back()=='\0'){
dirpath.pop_back();
}
if(dirpath.back()!='/'){
dirpath+="/";
}
string pathie = dirpath;
dirpath+='\0';
char directory[dirpath.length()];
strcpy(directory, dirpath.c_str());
// open the directory
DIR *dir;
struct dirent *ent;
if ((dir = opendir(directory)) != NULL) {
// for each folder
while ((ent = readdir(dir)) != NULL) {
string pathfile = pathie;
pathfile += ent->d_name;
pathfile += '\0';
char pathname[pathfile.length()];
strcpy(pathname, pathfile.c_str());
//if it is not the parent or current directory
if (!( !strcmp(ent->d_name, ".") || !strcmp(ent->d_name, "..") )){
struct stat pathINFO;
stat(pathname, &pathINFO);
if( S_ISREG(pathINFO.st_mode)){
// add the socket and then the path file
string client_and_file = std::to_string(connectFD);
client_and_file += " ";
client_and_file += pathfile;
pthread_mutex_lock(&pool_mutex);
// if the queue is full wait
while(clients.size()>=queue_size)
pthread_cond_wait(&full_pool, &pool_mutex);
// print message of confirmation
std::cout << "[Thread: "<< std::hash<std::thread::id>()(std::this_thread::get_id()) << "]: Adding file "<< pathfile<<" to the queue..."<< std::endl;
// push the new work in the queue
clients.push(client_and_file);
pthread_cond_signal(&pool_cond);
pthread_mutex_unlock(&pool_mutex);
}
else if( S_ISDIR(pathINFO.st_mode)){
// remove the '\0'
pathfile.pop_back();
// since it is a directory recursively call katalog
katalog(pathfile, connectFD);
}
}
}
closedir(dir);
}
else {
// error with open
errorcheck(-1, "opening directory");
}
}
// assigns a work from the queue to the worker thread
void* worker_thread(void* argi){
while(1){
// lock first
pthread_mutex_lock(&pool_mutex);
// if queue is empty wait on the cond
while(clients.empty())
pthread_cond_wait(&pool_cond, &pool_mutex);
// remove the work from the queue
string clientrequest = clients.front();
clients.pop();
// print confirmation message
std::cout << "[Thread: "<< std::hash<std::thread::id>()(std::this_thread::get_id()) << "]: Received task: <"<<clientrequest<<">"<< std::endl;
// signal waiting threads on full pool
pthread_cond_signal(&full_pool);
pthread_mutex_unlock(&pool_mutex);
// call the function that sends the file and metadata to the client
handle_client_request(clientrequest);
}
return NULL;
}
// function that sends the file and metadata to the client
void* handle_client_request(string client_request){
// get socket FD from the string
string CFD = "";
int i=0;
while (client_request[i]!=' '){
CFD+=client_request[i];
i++;
}
int connectFD = stoi(CFD);
i++;
// get the file path
string file_in_question = "";
while (client_request[i]!='\0'){
file_in_question+=client_request[i];
i++;
}
file_in_question+='\0';
// initially check to find the socket mutex in the map
pthread_mutex_lock(&socket_mutex);
if(socketmap.find(connectFD)==socketmap.end()){
errorcheck(-1, "mutex for socket unavailable");
}
pthread_mutex_unlock(&socket_mutex);
// lock the mutex
pthread_mutex_lock(&socketmap[connectFD]);
std::cout << "[Thread: "<< std::hash<std::thread::id>()(std::this_thread::get_id()) << "]: About to read file "<<file_in_question << std::endl;
// file to be sent
char filepath[file_in_question.length()];
strcpy(filepath, file_in_question.c_str());
// open
int FDR = errorcheck(open(filepath, O_RDONLY), "open");
// send the name size
uint32_t name_size = htonl(file_in_question.length());
write(connectFD, &name_size, sizeof(uint32_t));
// send the file path
write(connectFD, &filepath, file_in_question.length());
// to find the file size
struct stat metadata;
stat(filepath, &metadata);
// uint32_t for Linux (adapt accordingly for different systems)
uint32_t file_size = htonl(metadata.st_size);
// send the file size
write(connectFD, &file_size, sizeof(uint32_t));
// send the file in block_size packets
size_t bytes_read;
char buff[block_size];
memset(buff, '\0', sizeof(char)*block_size);
while((bytes_read = read(FDR, &buff, block_size))>0){
write(connectFD, buff, bytes_read);
memset(buff, '\0', sizeof(char)*block_size);
}
// close the open file
close(FDR);
// lock the mutex for the map
pthread_mutex_lock(&uc_mutex);
// error handling if its not found
if(unsatisfied_clients.find(connectFD)==unsatisfied_clients.end()){
errorcheck(-1, "map unsatisfied_clients doesn't have this key.");
}
else if(unsatisfied_clients[connectFD]>1){ // if it wasnt the last reduce the counter
unsatisfied_clients[connectFD]-=1;
}
else { // if all files have been sent close the socket
unsatisfied_clients.erase(unsatisfied_clients.find(connectFD));
close(connectFD);
}
pthread_mutex_unlock(&uc_mutex);
// unlock the socket mutex
pthread_mutex_unlock(&socketmap[connectFD]);
return NULL;
}