Skip to content

Commit

Permalink
Merge pull request #59 from DICL/refactor-dfsput
Browse files Browse the repository at this point in the history
Refactor dfsput
  • Loading branch information
kbjin committed Mar 17, 2016
2 parents 7f2530d + 82aef35 commit edfedd6
Show file tree
Hide file tree
Showing 18 changed files with 235 additions and 202 deletions.
10 changes: 6 additions & 4 deletions Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ include tests/Makefile.am
AM_CPPFLAGS = -I@srcdir@/src/common -I@srcdir@/src -include ./config.h
AM_CXXFLAGS = -Wall -std=c++14 -ggdb3

bin_PROGRAMS = eclipse_node dfsput dfsget dfsls dfsinit dfsrm
bin_PROGRAMS = eclipse_node dfsput dfsget dfsls dfsformat dfsrm


messages_files = src/messages/boundaries.cc \
Expand All @@ -23,7 +23,9 @@ messages_files = src/messages/boundaries.cc \
src/messages/filedescription.cc \
src/messages/cacheinfo.cc \
src/messages/blockdel.cc \
src/messages/filedel.cc
src/messages/filedel.cc \
src/messages/formatrequest.cc

# libs -----
lib_LTLIBRARIES = libecfs.la
libecfs_la_SOURCES = src/common/hash.cc src/common/settings.cc \
Expand Down Expand Up @@ -70,10 +72,10 @@ dfsls_SOURCES = src/fs/dfsls.cc src/fs/directory.cc \

dfsls_LDADD = $(LDADD) -lsqlite3

dfsinit_SOURCES = src/fs/dfsinit.cc src/fs/directory.cc \
dfsformat_SOURCES = src/fs/dfsformat.cc src/fs/directory.cc \
$(messages_files)

dfsinit_LDADD = $(LDADD) -lsqlite3
dfsformat_LDADD = $(LDADD) -lsqlite3

dfsrm_SOURCES = src/fs/dfsrm.cc src/fs/directory.cc \
$(messages_files)
Expand Down
82 changes: 82 additions & 0 deletions src/fs/dfsformat.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
#include "common/context.hh"
#include "directory.hh"
#include "../messages/filelist.hh"
#include "../messages/boost_impl.hh"
#include "../messages/factory.hh"
#include <iostream>
#include <dirent.h>
#include <cstdio>
#include <boost/asio.hpp>
#include <iomanip>

using namespace std;
using namespace eclipse;
using namespace eclipse::messages;
using boost::asio::ip::tcp;
using vec_str = std::vector<std::string>;

boost::asio::io_service iosvc;

tcp::socket* connect (int index) {
tcp::socket* socket = new tcp::socket (iosvc);
Settings setted = Settings().load();

int port = setted.get<int> ("network.port_mapreduce");
vec_str nodes = setted.get<vec_str> ("network.nodes");

string host = nodes[ index ];

tcp::resolver resolver (iosvc);
tcp::resolver::query query (host, to_string(port));
tcp::resolver::iterator it (resolver.resolve(query));
auto ep = new tcp::endpoint (*it);
socket->connect(*ep);
delete ep;
return socket;
}

void send_message (tcp::socket* socket, eclipse::messages::Message* msg) {
string out = save_message(msg);
stringstream ss;
ss << setfill('0') << setw(16) << out.length() << out;

socket->send(boost::asio::buffer(ss.str()));
}

eclipse::messages::Reply* read_reply(tcp::socket* socket) {
char header[17] = {0};
header[16] = '\0';
socket->receive(boost::asio::buffer(header, 16));
size_t size_of_msg = atoi(header);
char* body = new char[size_of_msg];
socket->receive(boost::asio::buffer(body, size_of_msg));
string recv_msg(body, size_of_msg);
eclipse::messages::Message* m = load_message(recv_msg);
delete[] body;
return dynamic_cast<eclipse::messages::Reply*>(m);
}

int main(int argc, char* argv[]) {
Context con;
int NUM_SERVERS = con.settings.get<vector<string>>("network.nodes").size();
vector<string> nodes = con.settings.get<vector<string>>("network.nodes");
vector<FileInfo> total;

for (int net_id = 0; net_id < NUM_SERVERS; net_id++) {
FormatRequest fr;
tcp::socket* socket = connect(net_id);
send_message(socket, &fr);
auto reply = read_reply(socket);

if (reply->message != "OK") {
cerr << "Failed to upload file. Details: " << reply->details << endl;
delete reply;
return EXIT_FAILURE;
}
delete reply;

socket->close();
delete socket;
}
return EXIT_SUCCESS;
}
17 changes: 0 additions & 17 deletions src/fs/dfsinit.cc

This file was deleted.

177 changes: 58 additions & 119 deletions src/fs/dfsput.cc
Original file line number Diff line number Diff line change
Expand Up @@ -78,39 +78,25 @@ int main(int argc, char* argv[])
for(int i=1; i<argc; i++)
{
string file_name = argv[i];
ifstream myfile (file_name);
myfile.seekg(1, myfile.end);
uint64_t file_size = myfile.tellg();
uint32_t start = 0;
uint32_t end = start + BLOCK_SIZE - 1;
uint32_t file_hash_key = h (file_name);
ifstream myfile (argv[i]);
uint64_t start = 0;
uint64_t end = start + BLOCK_SIZE - 1;
uint32_t block_size = 0;
unsigned int block_seq = 0;
unsigned int num_blocks = (file_size / BLOCK_SIZE) + 1;

auto socket = connect(file_hash_key);

//TODO: remote_metadata_server = lookup(hkey);
//int remote_metadata_server = 1;
FileInfo file_info;
file_info.file_name = file_name;
file_info.file_hash_key = file_hash_key;
file_info.file_size = file_size;
file_info.num_block = num_blocks;
file_info.file_hash_key = h(file_name);
file_info.replica = con.settings.get<int>("filesystem.replica");
myfile.seekg(0, myfile.end);
file_info.file_size = myfile.tellg();
BlockInfo block_info;

send_message(socket, &file_info);
auto reply = read_reply (socket);

if (reply->message != "OK") {
cerr << "Failed to upload file. Details: " << reply->details << endl;
delete reply;
return EXIT_FAILURE;
}
delete reply;
auto socket = connect(file_info.file_hash_key);

while(1)
{
if(end < file_size)
if(end < file_info.file_size)
{
myfile.seekg(start+BLOCK_SIZE-1, myfile.beg);
while(1)
Expand All @@ -126,106 +112,59 @@ int main(int argc, char* argv[])
end--;
}
}
BlockInfo block_info;
bzero(chunk, BLOCK_SIZE);
myfile.seekg(start, myfile.beg);
block_info.content.reserve(end-start);
myfile.read(chunk, end-start);
block_info.content = chunk;


uint32_t block_size = end - start;
start = end + 1;
end = start + BLOCK_SIZE - 1;

unsigned int block_hash_key = rand()%NUM_SERVERS;
//TODO: int remote_server = lookup(block_hash_key);
//int remote_server = 1;

block_info.file_name = file_name;
block_info.block_seq = block_seq;
block_info.block_hash_key = block_hash_key;
block_info.block_name = file_info.file_name + "_" + to_string(block_seq++);
block_info.block_size = block_size;
block_info.is_inter = 0;
block_info.node = "1.1.1.1";
block_info.l_node = "1.1.1.0";
block_info.r_node = "1.1.1.2";
//block_info.node = remote_server.ip_address;
//Node l_node = lookup((block_hash_key-1+NUM_SERVERS)%NUM_SERVERS);
//Node r_node = lookup((block_hash_key+1+NUM_SERVERS)%NUM_SERVERS);
//block_info.l_node = l_node.ip_address;
//block_info.r_node = r_node.ip_address;
file_info.num_block = block_seq;


send_message(socket, &block_info);
auto reply = read_reply (socket);

if (reply->message != "OK") {
cerr << "Failed to upload file. Details: " << reply->details << endl;
delete reply;
return EXIT_FAILURE;
}
delete reply;

//TODO: remote_metadata_server.update_file_metadata(fileinfo.file_name, file_info);
//cout << "remote_metadata_server.update_file_metadata(fileinfo.file_name, file_info);" << endl;

//TODO: remote_metadata_server.insert_block_metadata(blockinfo);
//cout << "remote_metadata_server.insert_block_metadata(blockinfo);" << endl;

//TODO: remote_server.send_buff(block_info.block_name, buff);
//cout << "remote_server.send_buff(block_info.block_name, buff);" << endl;
//remote_server.send_buff(block_hash_key, buff);
// this function should call FileIO.insert_block(_metadata) in remote metadata server?

// TODO: remote node part
}
else // last block
{
bzero(chunk, BLOCK_SIZE);
BlockInfo block_info;
myfile.seekg(start, myfile.beg);
block_info.content.reserve(end-start);
myfile.read(chunk, end-start);
block_info.content = chunk;

uint32_t block_size = end - start;

uint32_t block_hash_key = rand()%NUM_SERVERS;

// TODO: remote_server = lookup(block_hash_key);
//cout << "remote_server = lookup(block_hash_key);" << endl;

block_info.file_name = file_name;
block_info.block_seq = block_seq;
block_info.block_hash_key = block_hash_key;
block_info.block_name = file_name + "_" + to_string(block_seq++);
block_info.block_size = block_size;
block_info.is_inter = 0;
block_info.node = "1.1.1.1";
block_info.l_node = "1.1.1.0";
block_info.r_node = "1.1.1.2";
//block_info.node = remote_server.ip_address;
//Node l_node = lookup((block_hash_key-1+NUM_SERVERS)%NUM_SERVERS);
//Node r_node = lookup((block_hash_key+1+NUM_SERVERS)%NUM_SERVERS);
//block_info.l_node = l_node.ip_address;
//block_info.r_node = r_node.ip_address;
file_info.num_block = block_seq;

send_message(socket, &block_info);
auto reply = read_reply (socket);

if (reply->message != "OK") {
cerr << "Failed to upload file. Details: " << reply->details << endl;
delete reply;
return EXIT_FAILURE;
}
block_size = (uint32_t) end - start;
bzero(chunk, BLOCK_SIZE);
myfile.seekg(start, myfile.beg);
block_info.content.reserve(block_size);
myfile.read(chunk, block_size);
block_info.content = chunk;

block_info.block_name = file_name + "_" + to_string(block_seq++);
block_info.file_name = file_name;
block_info.block_seq = block_seq;
block_info.block_hash_key = (unsigned int) rand()%NUM_SERVERS;
block_info.block_size = block_size;
block_info.is_inter = 0;
block_info.node = "1.1.1.1";
block_info.l_node = "1.1.1.0";
block_info.r_node = "1.1.1.2";
//block_info.node = remote_server.ip_address;
//Node l_node = lookup((block_hash_key-1+NUM_SERVERS)%NUM_SERVERS);
//Node r_node = lookup((block_hash_key+1+NUM_SERVERS)%NUM_SERVERS);
//block_info.l_node = l_node.ip_address;
//block_info.r_node = r_node.ip_address;

send_message(socket, &block_info);
auto reply = read_reply (socket);

if (reply->message != "OK") {
cerr << "Failed to upload file. Details: " << reply->details << endl;
delete reply;
return EXIT_FAILURE;
}
delete reply;

if(end >= file_info.file_size)
{
break;
}
start = end + 1;
end = start + BLOCK_SIZE - 1;
}

file_info.num_block = block_seq;

send_message(socket, &file_info);
auto reply = read_reply (socket);

if (reply->message != "OK") {
cerr << "Failed to upload file. Details: " << reply->details << endl;
delete reply;
return EXIT_FAILURE;
}
delete reply;

socket->close();
delete socket;
myfile.close();
Expand Down
Loading

0 comments on commit edfedd6

Please sign in to comment.