diff options
Diffstat (limited to 'src/ceph/0009-librbd-clean-up-on-rbd-shared-cache.patch')
-rw-r--r-- | src/ceph/0009-librbd-clean-up-on-rbd-shared-cache.patch | 767 |
1 files changed, 0 insertions, 767 deletions
diff --git a/src/ceph/0009-librbd-clean-up-on-rbd-shared-cache.patch b/src/ceph/0009-librbd-clean-up-on-rbd-shared-cache.patch deleted file mode 100644 index 4adec93..0000000 --- a/src/ceph/0009-librbd-clean-up-on-rbd-shared-cache.patch +++ /dev/null @@ -1,767 +0,0 @@ -From 55b29a71c238ac465d05035a51808a3b616a8f46 Mon Sep 17 00:00:00 2001 -From: Yuan Zhou <yuan.zhou@intel.com> -Date: Wed, 5 Sep 2018 14:40:54 +0800 -Subject: [PATCH 09/10] librbd: clean up on rbd shared cache - -Signed-off-by: Dehao Shang <dehao.shang@intel.com> -Signed-off-by: Yuan Zhou <yuan.zhou@intel.com> ---- - src/common/options.cc | 4 + - .../SharedPersistentObjectCacherObjectDispatch.cc | 28 ++- - .../SharedPersistentObjectCacherObjectDispatch.h | 3 +- - src/tools/rbd_cache/CacheController.cc | 11 +- - src/tools/rbd_cache/CacheController.h | 1 - - src/tools/rbd_cache/CacheControllerSocket.hpp | 213 +++++++++++++------ - .../rbd_cache/CacheControllerSocketClient.hpp | 226 ++++++++++++++------- - src/tools/rbd_cache/CacheControllerSocketCommon.h | 2 + - 8 files changed, 340 insertions(+), 148 deletions(-) - -diff --git a/src/common/options.cc b/src/common/options.cc -index b334c1e..3172744 100644 ---- a/src/common/options.cc -+++ b/src/common/options.cc -@@ -6365,6 +6365,10 @@ static std::vector<Option> get_rbd_options() { - .set_default("/tmp") - .set_description("shared ssd caching data dir"), - -+ Option("rbd_shared_cache_sock", Option::TYPE_STR, Option::LEVEL_ADVANCED) -+ .set_default("/tmp/rbd_shared_ro_cache_sock") -+ .set_description("shared ssd caching domain socket"), -+ - Option("rbd_shared_cache_entries", Option::TYPE_INT, Option::LEVEL_ADVANCED) - .set_default(4096) - .set_description("shared ssd caching data entries"), -diff --git a/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc b/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc -index 407ce49..7cbc019 100644 ---- a/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc -+++ b/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc -@@ -33,6 +33,7 @@ SharedPersistentObjectCacherObjectDispatch<I>::~SharedPersistentObjectCacherObje - delete m_cache_client; - } - -+// TODO if connect fails, init will return error to high layer. - template <typename I> - void SharedPersistentObjectCacherObjectDispatch<I>::init() { - auto cct = m_image_ctx->cct; -@@ -44,11 +45,11 @@ void SharedPersistentObjectCacherObjectDispatch<I>::init() { - return; - } - -- ldout(cct, 20) << "parent image: setup SRO cache client = " << dendl; -+ ldout(cct, 5) << "parent image: setup SRO cache client = " << dendl; - -- std::string controller_path = "/tmp/rbd_shared_readonly_cache_demo"; -- m_cache_client = new rbd::cache::CacheClient(io_service, controller_path.c_str(), -- ([&](std::string s){client_handle_request(s);})); -+ std::string controller_path = ((CephContext*)cct)->_conf.get_val<std::string>("rbd_shared_cache_sock"); -+ m_cache_client = new rbd::cache::CacheClient(controller_path.c_str(), -+ ([&](std::string s){client_handle_request(s);}), m_image_ctx->cct); - - int ret = m_cache_client->connect(); - if (ret < 0) { -@@ -78,18 +79,29 @@ bool SharedPersistentObjectCacherObjectDispatch<I>::read( - io::ExtentMap* extent_map, int* object_dispatch_flags, - io::DispatchResult* dispatch_result, Context** on_finish, - Context* on_dispatched) { -+ - // IO chained in reverse order -+ -+ // Now, policy is : when session have any error, later all read will dispatched to rados layer. -+ if(!m_cache_client->is_session_work()) { -+ *dispatch_result = io::DISPATCH_RESULT_CONTINUE; -+ on_dispatched->complete(0); -+ return true; -+ // TODO : domain socket have error, all read operation will dispatched to rados layer. -+ } -+ - auto cct = m_image_ctx->cct; - ldout(cct, 20) << "object_no=" << object_no << " " << object_off << "~" - << object_len << dendl; - -+ - on_dispatched = util::create_async_context_callback(*m_image_ctx, - on_dispatched); - auto ctx = new FunctionContext([this, oid, object_off, object_len, read_data, dispatch_result, on_dispatched](bool cache) { - handle_read_cache(cache, oid, object_off, object_len, read_data, dispatch_result, on_dispatched); - }); - -- if (m_cache_client && m_cache_client->connected && m_object_store) { -+ if (m_cache_client && m_cache_client->is_session_work() && m_object_store) { - m_cache_client->lookup_object(m_image_ctx->data_ctx.get_pool_name(), - m_image_ctx->id, oid, ctx); - } -@@ -109,6 +121,7 @@ int SharedPersistentObjectCacherObjectDispatch<I>::handle_read_cache( - // try to read from parent image - if (cache) { - int r = m_object_store->read_object(oid, read_data, object_off, object_len, on_dispatched); -+ //int r = object_len; - if (r != 0) { - *dispatch_result = io::DISPATCH_RESULT_COMPLETE; - //TODO(): complete in syncfile -@@ -123,7 +136,6 @@ int SharedPersistentObjectCacherObjectDispatch<I>::handle_read_cache( - return false; - } - } -- - template <typename I> - void SharedPersistentObjectCacherObjectDispatch<I>::client_handle_request(std::string msg) { - auto cct = m_image_ctx->cct; -@@ -133,7 +145,7 @@ void SharedPersistentObjectCacherObjectDispatch<I>::client_handle_request(std::s - - switch (io_ctx->type) { - case rbd::cache::RBDSC_REGISTER_REPLY: { -- // open cache handler for volume -+ // open cache handler for volume - ldout(cct, 20) << "SRO cache client open cache handler" << dendl; - m_object_store = new SharedPersistentObjectCacher<I>(m_image_ctx, m_image_ctx->shared_cache_path); - -@@ -153,7 +165,7 @@ void SharedPersistentObjectCacherObjectDispatch<I>::client_handle_request(std::s - } - default: ldout(cct, 20) << "nothing" << io_ctx->type <<dendl; - break; -- -+ - } - } - -diff --git a/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.h b/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.h -index 36b868a..5685244 100644 ---- a/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.h -+++ b/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.h -@@ -118,12 +118,11 @@ private: - uint64_t object_len, ceph::bufferlist* read_data, - io::DispatchResult* dispatch_result, - Context* on_dispatched); -+ void client_handle_request(std::string msg); - - ImageCtxT* m_image_ctx; - -- void client_handle_request(std::string msg); - rbd::cache::CacheClient *m_cache_client = nullptr; -- boost::asio::io_service io_service; - }; - - } // namespace cache -diff --git a/src/tools/rbd_cache/CacheController.cc b/src/tools/rbd_cache/CacheController.cc -index c9d674b..620192c 100644 ---- a/src/tools/rbd_cache/CacheController.cc -+++ b/src/tools/rbd_cache/CacheController.cc -@@ -65,12 +65,12 @@ void CacheController::handle_signal(int signum){} - void CacheController::run() { - try { - //TODO(): use new socket path -- std::string controller_path = "/tmp/rbd_shared_readonly_cache_demo"; -+ std::string controller_path = m_cct->_conf.get_val<std::string>("rbd_shared_cache_sock"); - std::remove(controller_path.c_str()); - -- m_cache_server = new CacheServer(io_service, controller_path, -- ([&](uint64_t p, std::string s){handle_request(p, s);})); -- io_service.run(); -+ m_cache_server = new CacheServer(controller_path, -+ ([&](uint64_t p, std::string s){handle_request(p, s);}), m_cct); -+ m_cache_server->run(); - } catch (std::exception& e) { - std::cerr << "Exception: " << e.what() << "\n"; - } -@@ -105,7 +105,8 @@ void CacheController::handle_request(uint64_t session_id, std::string msg){ - - break; - } -- -+ std::cout<<"can't recongize request"<<std::endl; -+ assert(0); // TODO replace it. - } - } - -diff --git a/src/tools/rbd_cache/CacheController.h b/src/tools/rbd_cache/CacheController.h -index 0e3abc1..0e23484 100644 ---- a/src/tools/rbd_cache/CacheController.h -+++ b/src/tools/rbd_cache/CacheController.h -@@ -41,7 +41,6 @@ class CacheController { - void handle_request(uint64_t sesstion_id, std::string msg); - - private: -- boost::asio::io_service io_service; - CacheServer *m_cache_server; - std::vector<const char*> m_args; - CephContext *m_cct; -diff --git a/src/tools/rbd_cache/CacheControllerSocket.hpp b/src/tools/rbd_cache/CacheControllerSocket.hpp -index d178b58..2ff7477 100644 ---- a/src/tools/rbd_cache/CacheControllerSocket.hpp -+++ b/src/tools/rbd_cache/CacheControllerSocket.hpp -@@ -11,6 +11,7 @@ - #include <string> - #include <boost/bind.hpp> - #include <boost/asio.hpp> -+#include <boost/asio/error.hpp> - #include <boost/algorithm/string.hpp> - #include "CacheControllerSocketCommon.h" - -@@ -23,110 +24,202 @@ namespace cache { - class session : public std::enable_shared_from_this<session> { - public: - session(uint64_t session_id, boost::asio::io_service& io_service, ProcessMsg processmsg) -- : session_id(session_id), socket_(io_service), process_msg(processmsg) {} -+ : m_session_id(session_id), m_dm_socket(io_service), process_msg(processmsg) {} - - stream_protocol::socket& socket() { -- return socket_; -+ return m_dm_socket; - } - - void start() { -- -- boost::asio::async_read(socket_, boost::asio::buffer(data_), -- boost::asio::transfer_exactly(544), -+ if(true) { -+ serial_handing_request(); -+ } else { -+ parallel_handing_request(); -+ } -+ } -+ // flow: -+ // -+ // recv request --> process request --> reply ack -+ // | | -+ // --------------<------------------------- -+ void serial_handing_request() { -+ boost::asio::async_read(m_dm_socket, boost::asio::buffer(m_buffer, RBDSC_MSG_LEN), -+ boost::asio::transfer_exactly(RBDSC_MSG_LEN), - boost::bind(&session::handle_read, -- shared_from_this(), -- boost::asio::placeholders::error, -- boost::asio::placeholders::bytes_transferred)); -+ shared_from_this(), -+ boost::asio::placeholders::error, -+ boost::asio::placeholders::bytes_transferred)); -+ } - -+ // flow : -+ // -+ // --> thread 1: process request -+ // recv request --> thread 2: process request --> reply ack -+ // --> thread n: process request -+ // -+ void parallel_handing_request() { -+ // TODO - } - -+private: -+ - void handle_read(const boost::system::error_code& error, size_t bytes_transferred) { -+ // when recv eof, the most proble is that client side close socket. -+ // so, server side need to end handing_request -+ if(error == boost::asio::error::eof) { -+ std::cout<<"session: async_read : " << error.message() << std::endl; -+ return; -+ } - -- if (!error) { -- if(bytes_transferred != 544){ -- assert(0); -- } -- process_msg(session_id, std::string(data_, bytes_transferred)); -+ if(error) { -+ std::cout<<"session: async_read fails: " << error.message() << std::endl; -+ assert(0); -+ } - -+ if(bytes_transferred != RBDSC_MSG_LEN) { -+ std::cout<<"session : request in-complete. "<<std::endl; -+ assert(0); - } -+ -+ // TODO async_process can increse coding readable. -+ // process_msg_callback call handle async_send -+ process_msg(m_session_id, std::string(m_buffer, bytes_transferred)); - } - -- void handle_write(const boost::system::error_code& error) { -- if (!error) { -- boost::asio::async_read(socket_, boost::asio::buffer(data_), -- boost::asio::transfer_exactly(544), -- boost::bind(&session::handle_read, -- shared_from_this(), -- boost::asio::placeholders::error, -- boost::asio::placeholders::bytes_transferred)); -+ void handle_write(const boost::system::error_code& error, size_t bytes_transferred) { -+ if (error) { -+ std::cout<<"session: async_write fails: " << error.message() << std::endl; -+ assert(0); - } -+ -+ if(bytes_transferred != RBDSC_MSG_LEN) { -+ std::cout<<"session : reply in-complete. "<<std::endl; -+ assert(0); -+ } -+ -+ boost::asio::async_read(m_dm_socket, boost::asio::buffer(m_buffer), -+ boost::asio::transfer_exactly(RBDSC_MSG_LEN), -+ boost::bind(&session::handle_read, -+ shared_from_this(), -+ boost::asio::placeholders::error, -+ boost::asio::placeholders::bytes_transferred)); -+ - } - -+public: - void send(std::string msg) { -- -- boost::asio::async_write(socket_, -+ boost::asio::async_write(m_dm_socket, - boost::asio::buffer(msg.c_str(), msg.size()), -- boost::asio::transfer_exactly(544), -+ boost::asio::transfer_exactly(RBDSC_MSG_LEN), - boost::bind(&session::handle_write, -- shared_from_this(), -- boost::asio::placeholders::error)); -+ shared_from_this(), -+ boost::asio::placeholders::error, -+ boost::asio::placeholders::bytes_transferred)); - - } - - private: -- uint64_t session_id; -- stream_protocol::socket socket_; -+ uint64_t m_session_id; -+ stream_protocol::socket m_dm_socket; - ProcessMsg process_msg; - - // Buffer used to store data received from the client. - //std::array<char, 1024> data_; -- char data_[1024]; -+ char m_buffer[1024]; - }; - - typedef std::shared_ptr<session> session_ptr; - - class CacheServer { - public: -- CacheServer(boost::asio::io_service& io_service, -- const std::string& file, ProcessMsg processmsg) -- : io_service_(io_service), -- server_process_msg(processmsg), -- acceptor_(io_service, stream_protocol::endpoint(file)) -- { -- session_ptr new_session(new session(session_id, io_service_, server_process_msg)); -- acceptor_.async_accept(new_session->socket(), -- boost::bind(&CacheServer::handle_accept, this, new_session, -- boost::asio::placeholders::error)); -- } -- -- void handle_accept(session_ptr new_session, -- const boost::system::error_code& error) -- { -- //TODO(): open librbd snap -- if (!error) { -- new_session->start(); -- session_map.emplace(session_id, new_session); -- session_id++; -- new_session.reset(new session(session_id, io_service_, server_process_msg)); -- acceptor_.async_accept(new_session->socket(), -- boost::bind(&CacheServer::handle_accept, this, new_session, -- boost::asio::placeholders::error)); -+ CacheServer(const std::string& file, ProcessMsg processmsg, CephContext* cct) -+ : m_cct(cct), m_server_process_msg(processmsg), -+ m_local_path(file), -+ m_acceptor(m_io_service) -+ {} -+ -+ void run() { -+ bool ret; -+ ret = start_accept(); -+ if(!ret) { -+ return; - } -+ m_io_service.run(); - } - -+ // TODO : use callback to replace this function. - void send(uint64_t session_id, std::string msg) { -- auto it = session_map.find(session_id); -- if (it != session_map.end()) { -+ auto it = m_session_map.find(session_id); -+ if (it != m_session_map.end()) { - it->second->send(msg); -+ } else { -+ // TODO : why don't find existing session id ? -+ std::cout<<"don't find session id..."<<std::endl; -+ assert(0); -+ } -+ } -+ -+private: -+ // when creating one acceptor, can control every step in this way. -+ bool start_accept() { -+ boost::system::error_code ec; -+ m_acceptor.open(m_local_path.protocol(), ec); -+ if(ec) { -+ std::cout << "m_acceptor open fails: " << ec.message() << std::endl; -+ return false; -+ } -+ -+ // TODO control acceptor attribute. -+ -+ m_acceptor.bind(m_local_path, ec); -+ if(ec) { -+ std::cout << "m_acceptor bind fails: " << ec.message() << std::endl; -+ return false; -+ } -+ -+ m_acceptor.listen(boost::asio::socket_base::max_connections, ec); -+ if(ec) { -+ std::cout << "m_acceptor listen fails: " << ec.message() << std::endl; -+ return false; - } -+ -+ accept(); -+ return true; -+ } -+ -+ void accept() { -+ session_ptr new_session(new session(m_session_id, m_io_service, m_server_process_msg)); -+ m_acceptor.async_accept(new_session->socket(), -+ boost::bind(&CacheServer::handle_accept, this, new_session, -+ boost::asio::placeholders::error)); -+ } -+ -+ void handle_accept(session_ptr new_session, const boost::system::error_code& error) { -+ //TODO(): open librbd snap ... yuan -+ -+ if(error) { -+ std::cout << "async accept fails : " << error.message() << std::endl; -+ assert(0); // TODO -+ } -+ -+ // must put session into m_session_map at the front of session.start() -+ m_session_map.emplace(m_session_id, new_session); -+ // TODO : session setting -+ new_session->start(); -+ m_session_id++; -+ -+ // lanuch next accept -+ accept(); - } - - private: -- boost::asio::io_service& io_service_; -- ProcessMsg server_process_msg; -- stream_protocol::acceptor acceptor_; -- uint64_t session_id = 1; -- std::map<uint64_t, session_ptr> session_map; -+ CephContext* m_cct; -+ boost::asio::io_service m_io_service; // TODO wrapper it. -+ ProcessMsg m_server_process_msg; -+ stream_protocol::endpoint m_local_path; -+ stream_protocol::acceptor m_acceptor; -+ uint64_t m_session_id = 1; -+ std::map<uint64_t, session_ptr> m_session_map; - }; - - } // namespace cache -diff --git a/src/tools/rbd_cache/CacheControllerSocketClient.hpp b/src/tools/rbd_cache/CacheControllerSocketClient.hpp -index 3b0ca00..964f888 100644 ---- a/src/tools/rbd_cache/CacheControllerSocketClient.hpp -+++ b/src/tools/rbd_cache/CacheControllerSocketClient.hpp -@@ -4,9 +4,12 @@ - #ifndef CACHE_CONTROLLER_SOCKET_CLIENT_H - #define CACHE_CONTROLLER_SOCKET_CLIENT_H - -+#include <atomic> - #include <boost/asio.hpp> - #include <boost/bind.hpp> -+#include <boost/asio/error.hpp> - #include <boost/algorithm/string.hpp> -+#include "librbd/ImageCtx.h" - #include "include/assert.h" - #include "include/Context.h" - #include "CacheControllerSocketCommon.h" -@@ -19,32 +22,64 @@ namespace cache { - - class CacheClient { - public: -- CacheClient(boost::asio::io_service& io_service, -- const std::string& file, ClientProcessMsg processmsg) -- : io_service_(io_service), -- io_service_work_(io_service), -- socket_(io_service), -+ CacheClient(const std::string& file, ClientProcessMsg processmsg, CephContext* ceph_ctx) -+ : m_io_service_work(m_io_service), -+ m_dm_socket(m_io_service), - m_client_process_msg(processmsg), -- ep_(stream_protocol::endpoint(file)) -+ m_ep(stream_protocol::endpoint(file)), -+ m_session_work(false), -+ cct(ceph_ctx) - { -- io_thread.reset(new std::thread([this](){io_service_.run(); })); -+ // TODO wrapper io_service -+ std::thread thd([this](){ -+ m_io_service.run();}); -+ thd.detach(); - } - -- ~CacheClient() { -- io_service_.stop(); -- io_thread->join(); -+ void run(){ - } - -- void run(){ -- } -+ bool is_session_work() { -+ return m_session_work.load() == true; -+ } -+ -+ // just when error occur, call this method. -+ void close() { -+ m_session_work.store(false); -+ boost::system::error_code close_ec; -+ m_dm_socket.close(close_ec); -+ if(close_ec) { -+ std::cout << "close: " << close_ec.message() << std::endl; -+ } -+ std::cout << "session don't work, later all request will be dispatched to rados layer" << std::endl; -+ } - - int connect() { -- try { -- socket_.connect(ep_); -- } catch (std::exception& e) { -+ boost::system::error_code ec; -+ m_dm_socket.connect(m_ep, ec); -+ if(ec) { -+ if(ec == boost::asio::error::connection_refused) { -+ std::cout << ec.message() << " : maybe rbd-cache Controller don't startup. " -+ << "Now data will be read from ceph cluster " << std::endl; -+ } else { -+ std::cout << "connect: " << ec.message() << std::endl; -+ } -+ -+ if(m_dm_socket.is_open()) { -+ // Set to indicate what error occurred, if any. -+ // Note that, even if the function indicates an error, -+ // the underlying descriptor is closed. -+ boost::system::error_code close_ec; -+ m_dm_socket.close(close_ec); -+ if(close_ec) { -+ std::cout << "close: " << close_ec.message() << std::endl; -+ } -+ } - return -1; - } -- connected = true; -+ -+ std::cout<<"connect success"<<std::endl; -+ - return 0; - } - -@@ -57,27 +92,51 @@ public: - message->vol_size = vol_size; - message->offset = 0; - message->length = 0; -- boost::asio::async_write(socket_, boost::asio::buffer((char*)message, message->size()), -- [this, message](const boost::system::error_code& err, size_t cb) { -- delete message; -- if (!err) { -- boost::asio::async_read(socket_, boost::asio::buffer(buffer_), -- boost::asio::transfer_exactly(544), -- [this](const boost::system::error_code& err, size_t cb) { -- if (!err) { -- m_client_process_msg(std::string(buffer_, cb)); -- } else { -- return -1; -- } -- }); -- } else { -- return -1; -- } -- }); -+ -+ uint64_t ret; -+ boost::system::error_code ec; -+ -+ ret = boost::asio::write(m_dm_socket, boost::asio::buffer((char*)message, message->size()), ec); -+ if(ec) { -+ std::cout << "write fails : " << ec.message() << std::endl; -+ return -1; -+ } -+ -+ if(ret != message->size()) { -+ std::cout << "write fails : ret != send_bytes "<< std::endl; -+ return -1; -+ } -+ -+ // hard code TODO -+ ret = boost::asio::read(m_dm_socket, boost::asio::buffer(m_recv_buffer, RBDSC_MSG_LEN), ec); -+ if(ec == boost::asio::error::eof) { -+ std::cout<< "recv eof"<<std::endl; -+ return -1; -+ } -+ -+ if(ec) { -+ std::cout << "write fails : " << ec.message() << std::endl; -+ return -1; -+ } -+ -+ if(ret != RBDSC_MSG_LEN) { -+ std::cout << "write fails : ret != receive bytes " << std::endl; -+ return -1; -+ } -+ -+ m_client_process_msg(std::string(m_recv_buffer, ret)); -+ -+ delete message; -+ -+ std::cout << "register volume success" << std::endl; -+ -+ // TODO -+ m_session_work.store(true); - - return 0; - } - -+ // if occur any error, we just return false. Then read from rados. - int lookup_object(std::string pool_name, std::string vol_name, std::string object_id, Context* on_finish) { - rbdsc_req_type_t *message = new rbdsc_req_type_t(); - message->type = RBDSC_READ; -@@ -87,59 +146,82 @@ public: - message->offset = 0; - message->length = 0; - -- boost::asio::async_write(socket_, boost::asio::buffer((char*)message, message->size()), -+ boost::asio::async_write(m_dm_socket, -+ boost::asio::buffer((char*)message, message->size()), -+ boost::asio::transfer_exactly(RBDSC_MSG_LEN), - [this, on_finish, message](const boost::system::error_code& err, size_t cb) { -- delete message; -- if (!err) { -+ delete message; -+ if(err) { -+ std::cout<< "lookup_object: async_write fails." << err.message() << std::endl; -+ close(); -+ on_finish->complete(false); -+ return; -+ } -+ if(cb != RBDSC_MSG_LEN) { -+ std::cout<< "lookup_object: async_write fails. in-complete request" <<std::endl; -+ close(); -+ on_finish->complete(false); -+ return; -+ } - get_result(on_finish); -- } else { -- return -1; -- } - }); - - return 0; - } - - void get_result(Context* on_finish) { -- boost::asio::async_read(socket_, boost::asio::buffer(buffer_), -- boost::asio::transfer_exactly(544), -+ boost::asio::async_read(m_dm_socket, boost::asio::buffer(m_recv_buffer, RBDSC_MSG_LEN), -+ boost::asio::transfer_exactly(RBDSC_MSG_LEN), - [this, on_finish](const boost::system::error_code& err, size_t cb) { -- if (cb != 544) { -- assert(0); -- } -- if (!err) { -- rbdsc_req_type_t *io_ctx = (rbdsc_req_type_t*)(buffer_); -- if (io_ctx->type == RBDSC_READ_REPLY) { -- on_finish->complete(true); -- return; -- } else { -- on_finish->complete(false); -- return; -- } -- } else { -- assert(0); -- return on_finish->complete(false); -- } -+ if(err == boost::asio::error::eof) { -+ std::cout<<"get_result: ack is EOF." << std::endl; -+ close(); -+ on_finish->complete(false); -+ return; -+ } -+ if(err) { -+ std::cout<< "get_result: async_read fails:" << err.message() << std::endl; -+ close(); -+ on_finish->complete(false); // TODO replace this assert with some metohds. -+ return; -+ } -+ if (cb != RBDSC_MSG_LEN) { -+ close(); -+ std::cout << "get_result: in-complete ack." << std::endl; -+ on_finish->complete(false); // TODO: replace this assert with some methods. -+ } -+ -+ rbdsc_req_type_t *io_ctx = (rbdsc_req_type_t*)(m_recv_buffer); -+ -+ // TODO: re-occur yuan's bug -+ if(io_ctx->type == RBDSC_READ) { -+ std::cout << "get rbdsc_read... " << std::endl; -+ assert(0); -+ } -+ -+ if (io_ctx->type == RBDSC_READ_REPLY) { -+ on_finish->complete(true); -+ return; -+ } else { -+ on_finish->complete(false); -+ return; -+ } - }); - } - -- - private: -- boost::asio::io_service& io_service_; -- boost::asio::io_service::work io_service_work_; -- stream_protocol::socket socket_; -- -- std::shared_ptr<std::thread> io_thread; -+ boost::asio::io_service m_io_service; -+ boost::asio::io_service::work m_io_service_work; -+ stream_protocol::socket m_dm_socket; - ClientProcessMsg m_client_process_msg; -- stream_protocol::endpoint ep_; -- char buffer_[1024]; -- int block_size_ = 1024; -- -- std::condition_variable cv; -- std::mutex m; -- --public: -- bool connected = false; -+ stream_protocol::endpoint m_ep; -+ char m_recv_buffer[1024]; -+ -+ // atomic modfiy for this variable. -+ // thread 1 : asio callback thread modify it. -+ // thread 2 : librbd read it. -+ std::atomic<bool> m_session_work; -+ CephContext* cct; - }; - - } // namespace cache -diff --git a/src/tools/rbd_cache/CacheControllerSocketCommon.h b/src/tools/rbd_cache/CacheControllerSocketCommon.h -index e026ec8..e17529a 100644 ---- a/src/tools/rbd_cache/CacheControllerSocketCommon.h -+++ b/src/tools/rbd_cache/CacheControllerSocketCommon.h -@@ -55,6 +55,8 @@ struct rbdsc_req_type_t { - } - }; - -+static const int RBDSC_MSG_LEN = sizeof(rbdsc_req_type_t); -+ - } // namespace cache - } // namespace rbd - #endif --- -2.7.4 - |