diff options
Diffstat (limited to 'src/ceph/0009-librbd-clean-up-on-rbd-shared-cache.patch')
-rw-r--r-- | src/ceph/0009-librbd-clean-up-on-rbd-shared-cache.patch | 767 |
1 files changed, 767 insertions, 0 deletions
diff --git a/src/ceph/0009-librbd-clean-up-on-rbd-shared-cache.patch b/src/ceph/0009-librbd-clean-up-on-rbd-shared-cache.patch new file mode 100644 index 0000000..4adec93 --- /dev/null +++ b/src/ceph/0009-librbd-clean-up-on-rbd-shared-cache.patch @@ -0,0 +1,767 @@ +From 55b29a71c238ac465d05035a51808a3b616a8f46 Mon Sep 17 00:00:00 2001 +From: Yuan Zhou <yuan.zhou@intel.com> +Date: Wed, 5 Sep 2018 14:40:54 +0800 +Subject: [PATCH 09/10] librbd: clean up on rbd shared cache + +Signed-off-by: Dehao Shang <dehao.shang@intel.com> +Signed-off-by: Yuan Zhou <yuan.zhou@intel.com> +--- + src/common/options.cc | 4 + + .../SharedPersistentObjectCacherObjectDispatch.cc | 28 ++- + .../SharedPersistentObjectCacherObjectDispatch.h | 3 +- + src/tools/rbd_cache/CacheController.cc | 11 +- + src/tools/rbd_cache/CacheController.h | 1 - + src/tools/rbd_cache/CacheControllerSocket.hpp | 213 +++++++++++++------ + .../rbd_cache/CacheControllerSocketClient.hpp | 226 ++++++++++++++------- + src/tools/rbd_cache/CacheControllerSocketCommon.h | 2 + + 8 files changed, 340 insertions(+), 148 deletions(-) + +diff --git a/src/common/options.cc b/src/common/options.cc +index b334c1e..3172744 100644 +--- a/src/common/options.cc ++++ b/src/common/options.cc +@@ -6365,6 +6365,10 @@ static std::vector<Option> get_rbd_options() { + .set_default("/tmp") + .set_description("shared ssd caching data dir"), + ++ Option("rbd_shared_cache_sock", Option::TYPE_STR, Option::LEVEL_ADVANCED) ++ .set_default("/tmp/rbd_shared_ro_cache_sock") ++ .set_description("shared ssd caching domain socket"), ++ + Option("rbd_shared_cache_entries", Option::TYPE_INT, Option::LEVEL_ADVANCED) + .set_default(4096) + .set_description("shared ssd caching data entries"), +diff --git a/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc b/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc +index 407ce49..7cbc019 100644 +--- a/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc ++++ b/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc +@@ -33,6 +33,7 @@ SharedPersistentObjectCacherObjectDispatch<I>::~SharedPersistentObjectCacherObje + delete m_cache_client; + } + ++// TODO if connect fails, init will return error to high layer. + template <typename I> + void SharedPersistentObjectCacherObjectDispatch<I>::init() { + auto cct = m_image_ctx->cct; +@@ -44,11 +45,11 @@ void SharedPersistentObjectCacherObjectDispatch<I>::init() { + return; + } + +- ldout(cct, 20) << "parent image: setup SRO cache client = " << dendl; ++ ldout(cct, 5) << "parent image: setup SRO cache client = " << dendl; + +- std::string controller_path = "/tmp/rbd_shared_readonly_cache_demo"; +- m_cache_client = new rbd::cache::CacheClient(io_service, controller_path.c_str(), +- ([&](std::string s){client_handle_request(s);})); ++ std::string controller_path = ((CephContext*)cct)->_conf.get_val<std::string>("rbd_shared_cache_sock"); ++ m_cache_client = new rbd::cache::CacheClient(controller_path.c_str(), ++ ([&](std::string s){client_handle_request(s);}), m_image_ctx->cct); + + int ret = m_cache_client->connect(); + if (ret < 0) { +@@ -78,18 +79,29 @@ bool SharedPersistentObjectCacherObjectDispatch<I>::read( + io::ExtentMap* extent_map, int* object_dispatch_flags, + io::DispatchResult* dispatch_result, Context** on_finish, + Context* on_dispatched) { ++ + // IO chained in reverse order ++ ++ // Now, policy is : when session have any error, later all read will dispatched to rados layer. ++ if(!m_cache_client->is_session_work()) { ++ *dispatch_result = io::DISPATCH_RESULT_CONTINUE; ++ on_dispatched->complete(0); ++ return true; ++ // TODO : domain socket have error, all read operation will dispatched to rados layer. ++ } ++ + auto cct = m_image_ctx->cct; + ldout(cct, 20) << "object_no=" << object_no << " " << object_off << "~" + << object_len << dendl; + ++ + on_dispatched = util::create_async_context_callback(*m_image_ctx, + on_dispatched); + auto ctx = new FunctionContext([this, oid, object_off, object_len, read_data, dispatch_result, on_dispatched](bool cache) { + handle_read_cache(cache, oid, object_off, object_len, read_data, dispatch_result, on_dispatched); + }); + +- if (m_cache_client && m_cache_client->connected && m_object_store) { ++ if (m_cache_client && m_cache_client->is_session_work() && m_object_store) { + m_cache_client->lookup_object(m_image_ctx->data_ctx.get_pool_name(), + m_image_ctx->id, oid, ctx); + } +@@ -109,6 +121,7 @@ int SharedPersistentObjectCacherObjectDispatch<I>::handle_read_cache( + // try to read from parent image + if (cache) { + int r = m_object_store->read_object(oid, read_data, object_off, object_len, on_dispatched); ++ //int r = object_len; + if (r != 0) { + *dispatch_result = io::DISPATCH_RESULT_COMPLETE; + //TODO(): complete in syncfile +@@ -123,7 +136,6 @@ int SharedPersistentObjectCacherObjectDispatch<I>::handle_read_cache( + return false; + } + } +- + template <typename I> + void SharedPersistentObjectCacherObjectDispatch<I>::client_handle_request(std::string msg) { + auto cct = m_image_ctx->cct; +@@ -133,7 +145,7 @@ void SharedPersistentObjectCacherObjectDispatch<I>::client_handle_request(std::s + + switch (io_ctx->type) { + case rbd::cache::RBDSC_REGISTER_REPLY: { +- // open cache handler for volume ++ // open cache handler for volume + ldout(cct, 20) << "SRO cache client open cache handler" << dendl; + m_object_store = new SharedPersistentObjectCacher<I>(m_image_ctx, m_image_ctx->shared_cache_path); + +@@ -153,7 +165,7 @@ void SharedPersistentObjectCacherObjectDispatch<I>::client_handle_request(std::s + } + default: ldout(cct, 20) << "nothing" << io_ctx->type <<dendl; + break; +- ++ + } + } + +diff --git a/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.h b/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.h +index 36b868a..5685244 100644 +--- a/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.h ++++ b/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.h +@@ -118,12 +118,11 @@ private: + uint64_t object_len, ceph::bufferlist* read_data, + io::DispatchResult* dispatch_result, + Context* on_dispatched); ++ void client_handle_request(std::string msg); + + ImageCtxT* m_image_ctx; + +- void client_handle_request(std::string msg); + rbd::cache::CacheClient *m_cache_client = nullptr; +- boost::asio::io_service io_service; + }; + + } // namespace cache +diff --git a/src/tools/rbd_cache/CacheController.cc b/src/tools/rbd_cache/CacheController.cc +index c9d674b..620192c 100644 +--- a/src/tools/rbd_cache/CacheController.cc ++++ b/src/tools/rbd_cache/CacheController.cc +@@ -65,12 +65,12 @@ void CacheController::handle_signal(int signum){} + void CacheController::run() { + try { + //TODO(): use new socket path +- std::string controller_path = "/tmp/rbd_shared_readonly_cache_demo"; ++ std::string controller_path = m_cct->_conf.get_val<std::string>("rbd_shared_cache_sock"); + std::remove(controller_path.c_str()); + +- m_cache_server = new CacheServer(io_service, controller_path, +- ([&](uint64_t p, std::string s){handle_request(p, s);})); +- io_service.run(); ++ m_cache_server = new CacheServer(controller_path, ++ ([&](uint64_t p, std::string s){handle_request(p, s);}), m_cct); ++ m_cache_server->run(); + } catch (std::exception& e) { + std::cerr << "Exception: " << e.what() << "\n"; + } +@@ -105,7 +105,8 @@ void CacheController::handle_request(uint64_t session_id, std::string msg){ + + break; + } +- ++ std::cout<<"can't recongize request"<<std::endl; ++ assert(0); // TODO replace it. + } + } + +diff --git a/src/tools/rbd_cache/CacheController.h b/src/tools/rbd_cache/CacheController.h +index 0e3abc1..0e23484 100644 +--- a/src/tools/rbd_cache/CacheController.h ++++ b/src/tools/rbd_cache/CacheController.h +@@ -41,7 +41,6 @@ class CacheController { + void handle_request(uint64_t sesstion_id, std::string msg); + + private: +- boost::asio::io_service io_service; + CacheServer *m_cache_server; + std::vector<const char*> m_args; + CephContext *m_cct; +diff --git a/src/tools/rbd_cache/CacheControllerSocket.hpp b/src/tools/rbd_cache/CacheControllerSocket.hpp +index d178b58..2ff7477 100644 +--- a/src/tools/rbd_cache/CacheControllerSocket.hpp ++++ b/src/tools/rbd_cache/CacheControllerSocket.hpp +@@ -11,6 +11,7 @@ + #include <string> + #include <boost/bind.hpp> + #include <boost/asio.hpp> ++#include <boost/asio/error.hpp> + #include <boost/algorithm/string.hpp> + #include "CacheControllerSocketCommon.h" + +@@ -23,110 +24,202 @@ namespace cache { + class session : public std::enable_shared_from_this<session> { + public: + session(uint64_t session_id, boost::asio::io_service& io_service, ProcessMsg processmsg) +- : session_id(session_id), socket_(io_service), process_msg(processmsg) {} ++ : m_session_id(session_id), m_dm_socket(io_service), process_msg(processmsg) {} + + stream_protocol::socket& socket() { +- return socket_; ++ return m_dm_socket; + } + + void start() { +- +- boost::asio::async_read(socket_, boost::asio::buffer(data_), +- boost::asio::transfer_exactly(544), ++ if(true) { ++ serial_handing_request(); ++ } else { ++ parallel_handing_request(); ++ } ++ } ++ // flow: ++ // ++ // recv request --> process request --> reply ack ++ // | | ++ // --------------<------------------------- ++ void serial_handing_request() { ++ boost::asio::async_read(m_dm_socket, boost::asio::buffer(m_buffer, RBDSC_MSG_LEN), ++ boost::asio::transfer_exactly(RBDSC_MSG_LEN), + boost::bind(&session::handle_read, +- shared_from_this(), +- boost::asio::placeholders::error, +- boost::asio::placeholders::bytes_transferred)); ++ shared_from_this(), ++ boost::asio::placeholders::error, ++ boost::asio::placeholders::bytes_transferred)); ++ } + ++ // flow : ++ // ++ // --> thread 1: process request ++ // recv request --> thread 2: process request --> reply ack ++ // --> thread n: process request ++ // ++ void parallel_handing_request() { ++ // TODO + } + ++private: ++ + void handle_read(const boost::system::error_code& error, size_t bytes_transferred) { ++ // when recv eof, the most proble is that client side close socket. ++ // so, server side need to end handing_request ++ if(error == boost::asio::error::eof) { ++ std::cout<<"session: async_read : " << error.message() << std::endl; ++ return; ++ } + +- if (!error) { +- if(bytes_transferred != 544){ +- assert(0); +- } +- process_msg(session_id, std::string(data_, bytes_transferred)); ++ if(error) { ++ std::cout<<"session: async_read fails: " << error.message() << std::endl; ++ assert(0); ++ } + ++ if(bytes_transferred != RBDSC_MSG_LEN) { ++ std::cout<<"session : request in-complete. "<<std::endl; ++ assert(0); + } ++ ++ // TODO async_process can increse coding readable. ++ // process_msg_callback call handle async_send ++ process_msg(m_session_id, std::string(m_buffer, bytes_transferred)); + } + +- void handle_write(const boost::system::error_code& error) { +- if (!error) { +- boost::asio::async_read(socket_, boost::asio::buffer(data_), +- boost::asio::transfer_exactly(544), +- boost::bind(&session::handle_read, +- shared_from_this(), +- boost::asio::placeholders::error, +- boost::asio::placeholders::bytes_transferred)); ++ void handle_write(const boost::system::error_code& error, size_t bytes_transferred) { ++ if (error) { ++ std::cout<<"session: async_write fails: " << error.message() << std::endl; ++ assert(0); + } ++ ++ if(bytes_transferred != RBDSC_MSG_LEN) { ++ std::cout<<"session : reply in-complete. "<<std::endl; ++ assert(0); ++ } ++ ++ boost::asio::async_read(m_dm_socket, boost::asio::buffer(m_buffer), ++ boost::asio::transfer_exactly(RBDSC_MSG_LEN), ++ boost::bind(&session::handle_read, ++ shared_from_this(), ++ boost::asio::placeholders::error, ++ boost::asio::placeholders::bytes_transferred)); ++ + } + ++public: + void send(std::string msg) { +- +- boost::asio::async_write(socket_, ++ boost::asio::async_write(m_dm_socket, + boost::asio::buffer(msg.c_str(), msg.size()), +- boost::asio::transfer_exactly(544), ++ boost::asio::transfer_exactly(RBDSC_MSG_LEN), + boost::bind(&session::handle_write, +- shared_from_this(), +- boost::asio::placeholders::error)); ++ shared_from_this(), ++ boost::asio::placeholders::error, ++ boost::asio::placeholders::bytes_transferred)); + + } + + private: +- uint64_t session_id; +- stream_protocol::socket socket_; ++ uint64_t m_session_id; ++ stream_protocol::socket m_dm_socket; + ProcessMsg process_msg; + + // Buffer used to store data received from the client. + //std::array<char, 1024> data_; +- char data_[1024]; ++ char m_buffer[1024]; + }; + + typedef std::shared_ptr<session> session_ptr; + + class CacheServer { + public: +- CacheServer(boost::asio::io_service& io_service, +- const std::string& file, ProcessMsg processmsg) +- : io_service_(io_service), +- server_process_msg(processmsg), +- acceptor_(io_service, stream_protocol::endpoint(file)) +- { +- session_ptr new_session(new session(session_id, io_service_, server_process_msg)); +- acceptor_.async_accept(new_session->socket(), +- boost::bind(&CacheServer::handle_accept, this, new_session, +- boost::asio::placeholders::error)); +- } +- +- void handle_accept(session_ptr new_session, +- const boost::system::error_code& error) +- { +- //TODO(): open librbd snap +- if (!error) { +- new_session->start(); +- session_map.emplace(session_id, new_session); +- session_id++; +- new_session.reset(new session(session_id, io_service_, server_process_msg)); +- acceptor_.async_accept(new_session->socket(), +- boost::bind(&CacheServer::handle_accept, this, new_session, +- boost::asio::placeholders::error)); ++ CacheServer(const std::string& file, ProcessMsg processmsg, CephContext* cct) ++ : m_cct(cct), m_server_process_msg(processmsg), ++ m_local_path(file), ++ m_acceptor(m_io_service) ++ {} ++ ++ void run() { ++ bool ret; ++ ret = start_accept(); ++ if(!ret) { ++ return; + } ++ m_io_service.run(); + } + ++ // TODO : use callback to replace this function. + void send(uint64_t session_id, std::string msg) { +- auto it = session_map.find(session_id); +- if (it != session_map.end()) { ++ auto it = m_session_map.find(session_id); ++ if (it != m_session_map.end()) { + it->second->send(msg); ++ } else { ++ // TODO : why don't find existing session id ? ++ std::cout<<"don't find session id..."<<std::endl; ++ assert(0); ++ } ++ } ++ ++private: ++ // when creating one acceptor, can control every step in this way. ++ bool start_accept() { ++ boost::system::error_code ec; ++ m_acceptor.open(m_local_path.protocol(), ec); ++ if(ec) { ++ std::cout << "m_acceptor open fails: " << ec.message() << std::endl; ++ return false; ++ } ++ ++ // TODO control acceptor attribute. ++ ++ m_acceptor.bind(m_local_path, ec); ++ if(ec) { ++ std::cout << "m_acceptor bind fails: " << ec.message() << std::endl; ++ return false; ++ } ++ ++ m_acceptor.listen(boost::asio::socket_base::max_connections, ec); ++ if(ec) { ++ std::cout << "m_acceptor listen fails: " << ec.message() << std::endl; ++ return false; + } ++ ++ accept(); ++ return true; ++ } ++ ++ void accept() { ++ session_ptr new_session(new session(m_session_id, m_io_service, m_server_process_msg)); ++ m_acceptor.async_accept(new_session->socket(), ++ boost::bind(&CacheServer::handle_accept, this, new_session, ++ boost::asio::placeholders::error)); ++ } ++ ++ void handle_accept(session_ptr new_session, const boost::system::error_code& error) { ++ //TODO(): open librbd snap ... yuan ++ ++ if(error) { ++ std::cout << "async accept fails : " << error.message() << std::endl; ++ assert(0); // TODO ++ } ++ ++ // must put session into m_session_map at the front of session.start() ++ m_session_map.emplace(m_session_id, new_session); ++ // TODO : session setting ++ new_session->start(); ++ m_session_id++; ++ ++ // lanuch next accept ++ accept(); + } + + private: +- boost::asio::io_service& io_service_; +- ProcessMsg server_process_msg; +- stream_protocol::acceptor acceptor_; +- uint64_t session_id = 1; +- std::map<uint64_t, session_ptr> session_map; ++ CephContext* m_cct; ++ boost::asio::io_service m_io_service; // TODO wrapper it. ++ ProcessMsg m_server_process_msg; ++ stream_protocol::endpoint m_local_path; ++ stream_protocol::acceptor m_acceptor; ++ uint64_t m_session_id = 1; ++ std::map<uint64_t, session_ptr> m_session_map; + }; + + } // namespace cache +diff --git a/src/tools/rbd_cache/CacheControllerSocketClient.hpp b/src/tools/rbd_cache/CacheControllerSocketClient.hpp +index 3b0ca00..964f888 100644 +--- a/src/tools/rbd_cache/CacheControllerSocketClient.hpp ++++ b/src/tools/rbd_cache/CacheControllerSocketClient.hpp +@@ -4,9 +4,12 @@ + #ifndef CACHE_CONTROLLER_SOCKET_CLIENT_H + #define CACHE_CONTROLLER_SOCKET_CLIENT_H + ++#include <atomic> + #include <boost/asio.hpp> + #include <boost/bind.hpp> ++#include <boost/asio/error.hpp> + #include <boost/algorithm/string.hpp> ++#include "librbd/ImageCtx.h" + #include "include/assert.h" + #include "include/Context.h" + #include "CacheControllerSocketCommon.h" +@@ -19,32 +22,64 @@ namespace cache { + + class CacheClient { + public: +- CacheClient(boost::asio::io_service& io_service, +- const std::string& file, ClientProcessMsg processmsg) +- : io_service_(io_service), +- io_service_work_(io_service), +- socket_(io_service), ++ CacheClient(const std::string& file, ClientProcessMsg processmsg, CephContext* ceph_ctx) ++ : m_io_service_work(m_io_service), ++ m_dm_socket(m_io_service), + m_client_process_msg(processmsg), +- ep_(stream_protocol::endpoint(file)) ++ m_ep(stream_protocol::endpoint(file)), ++ m_session_work(false), ++ cct(ceph_ctx) + { +- io_thread.reset(new std::thread([this](){io_service_.run(); })); ++ // TODO wrapper io_service ++ std::thread thd([this](){ ++ m_io_service.run();}); ++ thd.detach(); + } + +- ~CacheClient() { +- io_service_.stop(); +- io_thread->join(); ++ void run(){ + } + +- void run(){ +- } ++ bool is_session_work() { ++ return m_session_work.load() == true; ++ } ++ ++ // just when error occur, call this method. ++ void close() { ++ m_session_work.store(false); ++ boost::system::error_code close_ec; ++ m_dm_socket.close(close_ec); ++ if(close_ec) { ++ std::cout << "close: " << close_ec.message() << std::endl; ++ } ++ std::cout << "session don't work, later all request will be dispatched to rados layer" << std::endl; ++ } + + int connect() { +- try { +- socket_.connect(ep_); +- } catch (std::exception& e) { ++ boost::system::error_code ec; ++ m_dm_socket.connect(m_ep, ec); ++ if(ec) { ++ if(ec == boost::asio::error::connection_refused) { ++ std::cout << ec.message() << " : maybe rbd-cache Controller don't startup. " ++ << "Now data will be read from ceph cluster " << std::endl; ++ } else { ++ std::cout << "connect: " << ec.message() << std::endl; ++ } ++ ++ if(m_dm_socket.is_open()) { ++ // Set to indicate what error occurred, if any. ++ // Note that, even if the function indicates an error, ++ // the underlying descriptor is closed. ++ boost::system::error_code close_ec; ++ m_dm_socket.close(close_ec); ++ if(close_ec) { ++ std::cout << "close: " << close_ec.message() << std::endl; ++ } ++ } + return -1; + } +- connected = true; ++ ++ std::cout<<"connect success"<<std::endl; ++ + return 0; + } + +@@ -57,27 +92,51 @@ public: + message->vol_size = vol_size; + message->offset = 0; + message->length = 0; +- boost::asio::async_write(socket_, boost::asio::buffer((char*)message, message->size()), +- [this, message](const boost::system::error_code& err, size_t cb) { +- delete message; +- if (!err) { +- boost::asio::async_read(socket_, boost::asio::buffer(buffer_), +- boost::asio::transfer_exactly(544), +- [this](const boost::system::error_code& err, size_t cb) { +- if (!err) { +- m_client_process_msg(std::string(buffer_, cb)); +- } else { +- return -1; +- } +- }); +- } else { +- return -1; +- } +- }); ++ ++ uint64_t ret; ++ boost::system::error_code ec; ++ ++ ret = boost::asio::write(m_dm_socket, boost::asio::buffer((char*)message, message->size()), ec); ++ if(ec) { ++ std::cout << "write fails : " << ec.message() << std::endl; ++ return -1; ++ } ++ ++ if(ret != message->size()) { ++ std::cout << "write fails : ret != send_bytes "<< std::endl; ++ return -1; ++ } ++ ++ // hard code TODO ++ ret = boost::asio::read(m_dm_socket, boost::asio::buffer(m_recv_buffer, RBDSC_MSG_LEN), ec); ++ if(ec == boost::asio::error::eof) { ++ std::cout<< "recv eof"<<std::endl; ++ return -1; ++ } ++ ++ if(ec) { ++ std::cout << "write fails : " << ec.message() << std::endl; ++ return -1; ++ } ++ ++ if(ret != RBDSC_MSG_LEN) { ++ std::cout << "write fails : ret != receive bytes " << std::endl; ++ return -1; ++ } ++ ++ m_client_process_msg(std::string(m_recv_buffer, ret)); ++ ++ delete message; ++ ++ std::cout << "register volume success" << std::endl; ++ ++ // TODO ++ m_session_work.store(true); + + return 0; + } + ++ // if occur any error, we just return false. Then read from rados. + int lookup_object(std::string pool_name, std::string vol_name, std::string object_id, Context* on_finish) { + rbdsc_req_type_t *message = new rbdsc_req_type_t(); + message->type = RBDSC_READ; +@@ -87,59 +146,82 @@ public: + message->offset = 0; + message->length = 0; + +- boost::asio::async_write(socket_, boost::asio::buffer((char*)message, message->size()), ++ boost::asio::async_write(m_dm_socket, ++ boost::asio::buffer((char*)message, message->size()), ++ boost::asio::transfer_exactly(RBDSC_MSG_LEN), + [this, on_finish, message](const boost::system::error_code& err, size_t cb) { +- delete message; +- if (!err) { ++ delete message; ++ if(err) { ++ std::cout<< "lookup_object: async_write fails." << err.message() << std::endl; ++ close(); ++ on_finish->complete(false); ++ return; ++ } ++ if(cb != RBDSC_MSG_LEN) { ++ std::cout<< "lookup_object: async_write fails. in-complete request" <<std::endl; ++ close(); ++ on_finish->complete(false); ++ return; ++ } + get_result(on_finish); +- } else { +- return -1; +- } + }); + + return 0; + } + + void get_result(Context* on_finish) { +- boost::asio::async_read(socket_, boost::asio::buffer(buffer_), +- boost::asio::transfer_exactly(544), ++ boost::asio::async_read(m_dm_socket, boost::asio::buffer(m_recv_buffer, RBDSC_MSG_LEN), ++ boost::asio::transfer_exactly(RBDSC_MSG_LEN), + [this, on_finish](const boost::system::error_code& err, size_t cb) { +- if (cb != 544) { +- assert(0); +- } +- if (!err) { +- rbdsc_req_type_t *io_ctx = (rbdsc_req_type_t*)(buffer_); +- if (io_ctx->type == RBDSC_READ_REPLY) { +- on_finish->complete(true); +- return; +- } else { +- on_finish->complete(false); +- return; +- } +- } else { +- assert(0); +- return on_finish->complete(false); +- } ++ if(err == boost::asio::error::eof) { ++ std::cout<<"get_result: ack is EOF." << std::endl; ++ close(); ++ on_finish->complete(false); ++ return; ++ } ++ if(err) { ++ std::cout<< "get_result: async_read fails:" << err.message() << std::endl; ++ close(); ++ on_finish->complete(false); // TODO replace this assert with some metohds. ++ return; ++ } ++ if (cb != RBDSC_MSG_LEN) { ++ close(); ++ std::cout << "get_result: in-complete ack." << std::endl; ++ on_finish->complete(false); // TODO: replace this assert with some methods. ++ } ++ ++ rbdsc_req_type_t *io_ctx = (rbdsc_req_type_t*)(m_recv_buffer); ++ ++ // TODO: re-occur yuan's bug ++ if(io_ctx->type == RBDSC_READ) { ++ std::cout << "get rbdsc_read... " << std::endl; ++ assert(0); ++ } ++ ++ if (io_ctx->type == RBDSC_READ_REPLY) { ++ on_finish->complete(true); ++ return; ++ } else { ++ on_finish->complete(false); ++ return; ++ } + }); + } + +- + private: +- boost::asio::io_service& io_service_; +- boost::asio::io_service::work io_service_work_; +- stream_protocol::socket socket_; +- +- std::shared_ptr<std::thread> io_thread; ++ boost::asio::io_service m_io_service; ++ boost::asio::io_service::work m_io_service_work; ++ stream_protocol::socket m_dm_socket; + ClientProcessMsg m_client_process_msg; +- stream_protocol::endpoint ep_; +- char buffer_[1024]; +- int block_size_ = 1024; +- +- std::condition_variable cv; +- std::mutex m; +- +-public: +- bool connected = false; ++ stream_protocol::endpoint m_ep; ++ char m_recv_buffer[1024]; ++ ++ // atomic modfiy for this variable. ++ // thread 1 : asio callback thread modify it. ++ // thread 2 : librbd read it. ++ std::atomic<bool> m_session_work; ++ CephContext* cct; + }; + + } // namespace cache +diff --git a/src/tools/rbd_cache/CacheControllerSocketCommon.h b/src/tools/rbd_cache/CacheControllerSocketCommon.h +index e026ec8..e17529a 100644 +--- a/src/tools/rbd_cache/CacheControllerSocketCommon.h ++++ b/src/tools/rbd_cache/CacheControllerSocketCommon.h +@@ -55,6 +55,8 @@ struct rbdsc_req_type_t { + } + }; + ++static const int RBDSC_MSG_LEN = sizeof(rbdsc_req_type_t); ++ + } // namespace cache + } // namespace rbd + #endif +-- +2.7.4 + |