summaryrefslogtreecommitdiffstats
path: root/src/ceph/0009-librbd-clean-up-on-rbd-shared-cache.patch
diff options
context:
space:
mode:
authorQiaowei Ren <qiaowei.ren@intel.com>2018-09-11 10:54:20 +0800
committerQiaowei Ren <qiaowei.ren@intel.com>2018-09-11 10:54:20 +0800
commitd65e22d27ab305d38059046dae60d7a66ff4a4e0 (patch)
treed0bd06459534ef33d0b930f6c164c4501bd527d7 /src/ceph/0009-librbd-clean-up-on-rbd-shared-cache.patch
parent828acdd1d5c5c2aeef287aa69e473bf44fcbce70 (diff)
ceph: shared persistent read-only rbd cache
This patch introduces introduces RBD shared persistent RO cache which can provide client-side sharing cache for rbd clone/snapshot case. Change-Id: Icad8063f4f10b1ab4ce31920e90d5affa7d0abdc Signed-off-by: Qiaowei Ren <qiaowei.ren@intel.com> Signed-off-by: Dehao Shang <dehao.shang@intel.com> Signed-off-by: Tushar Gohad <tushar.gohad@intel.com> Signed-off-by: Jason Dillaman <dillaman@redhat.com> Signed-off-by: Yuan Zhou <yuan.zhou@intel.com>
Diffstat (limited to 'src/ceph/0009-librbd-clean-up-on-rbd-shared-cache.patch')
-rw-r--r--src/ceph/0009-librbd-clean-up-on-rbd-shared-cache.patch767
1 files changed, 767 insertions, 0 deletions
diff --git a/src/ceph/0009-librbd-clean-up-on-rbd-shared-cache.patch b/src/ceph/0009-librbd-clean-up-on-rbd-shared-cache.patch
new file mode 100644
index 0000000..4adec93
--- /dev/null
+++ b/src/ceph/0009-librbd-clean-up-on-rbd-shared-cache.patch
@@ -0,0 +1,767 @@
+From 55b29a71c238ac465d05035a51808a3b616a8f46 Mon Sep 17 00:00:00 2001
+From: Yuan Zhou <yuan.zhou@intel.com>
+Date: Wed, 5 Sep 2018 14:40:54 +0800
+Subject: [PATCH 09/10] librbd: clean up on rbd shared cache
+
+Signed-off-by: Dehao Shang <dehao.shang@intel.com>
+Signed-off-by: Yuan Zhou <yuan.zhou@intel.com>
+---
+ src/common/options.cc | 4 +
+ .../SharedPersistentObjectCacherObjectDispatch.cc | 28 ++-
+ .../SharedPersistentObjectCacherObjectDispatch.h | 3 +-
+ src/tools/rbd_cache/CacheController.cc | 11 +-
+ src/tools/rbd_cache/CacheController.h | 1 -
+ src/tools/rbd_cache/CacheControllerSocket.hpp | 213 +++++++++++++------
+ .../rbd_cache/CacheControllerSocketClient.hpp | 226 ++++++++++++++-------
+ src/tools/rbd_cache/CacheControllerSocketCommon.h | 2 +
+ 8 files changed, 340 insertions(+), 148 deletions(-)
+
+diff --git a/src/common/options.cc b/src/common/options.cc
+index b334c1e..3172744 100644
+--- a/src/common/options.cc
++++ b/src/common/options.cc
+@@ -6365,6 +6365,10 @@ static std::vector<Option> get_rbd_options() {
+ .set_default("/tmp")
+ .set_description("shared ssd caching data dir"),
+
++ Option("rbd_shared_cache_sock", Option::TYPE_STR, Option::LEVEL_ADVANCED)
++ .set_default("/tmp/rbd_shared_ro_cache_sock")
++ .set_description("shared ssd caching domain socket"),
++
+ Option("rbd_shared_cache_entries", Option::TYPE_INT, Option::LEVEL_ADVANCED)
+ .set_default(4096)
+ .set_description("shared ssd caching data entries"),
+diff --git a/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc b/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc
+index 407ce49..7cbc019 100644
+--- a/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc
++++ b/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc
+@@ -33,6 +33,7 @@ SharedPersistentObjectCacherObjectDispatch<I>::~SharedPersistentObjectCacherObje
+ delete m_cache_client;
+ }
+
++// TODO if connect fails, init will return error to high layer.
+ template <typename I>
+ void SharedPersistentObjectCacherObjectDispatch<I>::init() {
+ auto cct = m_image_ctx->cct;
+@@ -44,11 +45,11 @@ void SharedPersistentObjectCacherObjectDispatch<I>::init() {
+ return;
+ }
+
+- ldout(cct, 20) << "parent image: setup SRO cache client = " << dendl;
++ ldout(cct, 5) << "parent image: setup SRO cache client = " << dendl;
+
+- std::string controller_path = "/tmp/rbd_shared_readonly_cache_demo";
+- m_cache_client = new rbd::cache::CacheClient(io_service, controller_path.c_str(),
+- ([&](std::string s){client_handle_request(s);}));
++ std::string controller_path = ((CephContext*)cct)->_conf.get_val<std::string>("rbd_shared_cache_sock");
++ m_cache_client = new rbd::cache::CacheClient(controller_path.c_str(),
++ ([&](std::string s){client_handle_request(s);}), m_image_ctx->cct);
+
+ int ret = m_cache_client->connect();
+ if (ret < 0) {
+@@ -78,18 +79,29 @@ bool SharedPersistentObjectCacherObjectDispatch<I>::read(
+ io::ExtentMap* extent_map, int* object_dispatch_flags,
+ io::DispatchResult* dispatch_result, Context** on_finish,
+ Context* on_dispatched) {
++
+ // IO chained in reverse order
++
++ // Now, policy is : when session have any error, later all read will dispatched to rados layer.
++ if(!m_cache_client->is_session_work()) {
++ *dispatch_result = io::DISPATCH_RESULT_CONTINUE;
++ on_dispatched->complete(0);
++ return true;
++ // TODO : domain socket have error, all read operation will dispatched to rados layer.
++ }
++
+ auto cct = m_image_ctx->cct;
+ ldout(cct, 20) << "object_no=" << object_no << " " << object_off << "~"
+ << object_len << dendl;
+
++
+ on_dispatched = util::create_async_context_callback(*m_image_ctx,
+ on_dispatched);
+ auto ctx = new FunctionContext([this, oid, object_off, object_len, read_data, dispatch_result, on_dispatched](bool cache) {
+ handle_read_cache(cache, oid, object_off, object_len, read_data, dispatch_result, on_dispatched);
+ });
+
+- if (m_cache_client && m_cache_client->connected && m_object_store) {
++ if (m_cache_client && m_cache_client->is_session_work() && m_object_store) {
+ m_cache_client->lookup_object(m_image_ctx->data_ctx.get_pool_name(),
+ m_image_ctx->id, oid, ctx);
+ }
+@@ -109,6 +121,7 @@ int SharedPersistentObjectCacherObjectDispatch<I>::handle_read_cache(
+ // try to read from parent image
+ if (cache) {
+ int r = m_object_store->read_object(oid, read_data, object_off, object_len, on_dispatched);
++ //int r = object_len;
+ if (r != 0) {
+ *dispatch_result = io::DISPATCH_RESULT_COMPLETE;
+ //TODO(): complete in syncfile
+@@ -123,7 +136,6 @@ int SharedPersistentObjectCacherObjectDispatch<I>::handle_read_cache(
+ return false;
+ }
+ }
+-
+ template <typename I>
+ void SharedPersistentObjectCacherObjectDispatch<I>::client_handle_request(std::string msg) {
+ auto cct = m_image_ctx->cct;
+@@ -133,7 +145,7 @@ void SharedPersistentObjectCacherObjectDispatch<I>::client_handle_request(std::s
+
+ switch (io_ctx->type) {
+ case rbd::cache::RBDSC_REGISTER_REPLY: {
+- // open cache handler for volume
++ // open cache handler for volume
+ ldout(cct, 20) << "SRO cache client open cache handler" << dendl;
+ m_object_store = new SharedPersistentObjectCacher<I>(m_image_ctx, m_image_ctx->shared_cache_path);
+
+@@ -153,7 +165,7 @@ void SharedPersistentObjectCacherObjectDispatch<I>::client_handle_request(std::s
+ }
+ default: ldout(cct, 20) << "nothing" << io_ctx->type <<dendl;
+ break;
+-
++
+ }
+ }
+
+diff --git a/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.h b/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.h
+index 36b868a..5685244 100644
+--- a/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.h
++++ b/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.h
+@@ -118,12 +118,11 @@ private:
+ uint64_t object_len, ceph::bufferlist* read_data,
+ io::DispatchResult* dispatch_result,
+ Context* on_dispatched);
++ void client_handle_request(std::string msg);
+
+ ImageCtxT* m_image_ctx;
+
+- void client_handle_request(std::string msg);
+ rbd::cache::CacheClient *m_cache_client = nullptr;
+- boost::asio::io_service io_service;
+ };
+
+ } // namespace cache
+diff --git a/src/tools/rbd_cache/CacheController.cc b/src/tools/rbd_cache/CacheController.cc
+index c9d674b..620192c 100644
+--- a/src/tools/rbd_cache/CacheController.cc
++++ b/src/tools/rbd_cache/CacheController.cc
+@@ -65,12 +65,12 @@ void CacheController::handle_signal(int signum){}
+ void CacheController::run() {
+ try {
+ //TODO(): use new socket path
+- std::string controller_path = "/tmp/rbd_shared_readonly_cache_demo";
++ std::string controller_path = m_cct->_conf.get_val<std::string>("rbd_shared_cache_sock");
+ std::remove(controller_path.c_str());
+
+- m_cache_server = new CacheServer(io_service, controller_path,
+- ([&](uint64_t p, std::string s){handle_request(p, s);}));
+- io_service.run();
++ m_cache_server = new CacheServer(controller_path,
++ ([&](uint64_t p, std::string s){handle_request(p, s);}), m_cct);
++ m_cache_server->run();
+ } catch (std::exception& e) {
+ std::cerr << "Exception: " << e.what() << "\n";
+ }
+@@ -105,7 +105,8 @@ void CacheController::handle_request(uint64_t session_id, std::string msg){
+
+ break;
+ }
+-
++ std::cout<<"can't recongize request"<<std::endl;
++ assert(0); // TODO replace it.
+ }
+ }
+
+diff --git a/src/tools/rbd_cache/CacheController.h b/src/tools/rbd_cache/CacheController.h
+index 0e3abc1..0e23484 100644
+--- a/src/tools/rbd_cache/CacheController.h
++++ b/src/tools/rbd_cache/CacheController.h
+@@ -41,7 +41,6 @@ class CacheController {
+ void handle_request(uint64_t sesstion_id, std::string msg);
+
+ private:
+- boost::asio::io_service io_service;
+ CacheServer *m_cache_server;
+ std::vector<const char*> m_args;
+ CephContext *m_cct;
+diff --git a/src/tools/rbd_cache/CacheControllerSocket.hpp b/src/tools/rbd_cache/CacheControllerSocket.hpp
+index d178b58..2ff7477 100644
+--- a/src/tools/rbd_cache/CacheControllerSocket.hpp
++++ b/src/tools/rbd_cache/CacheControllerSocket.hpp
+@@ -11,6 +11,7 @@
+ #include <string>
+ #include <boost/bind.hpp>
+ #include <boost/asio.hpp>
++#include <boost/asio/error.hpp>
+ #include <boost/algorithm/string.hpp>
+ #include "CacheControllerSocketCommon.h"
+
+@@ -23,110 +24,202 @@ namespace cache {
+ class session : public std::enable_shared_from_this<session> {
+ public:
+ session(uint64_t session_id, boost::asio::io_service& io_service, ProcessMsg processmsg)
+- : session_id(session_id), socket_(io_service), process_msg(processmsg) {}
++ : m_session_id(session_id), m_dm_socket(io_service), process_msg(processmsg) {}
+
+ stream_protocol::socket& socket() {
+- return socket_;
++ return m_dm_socket;
+ }
+
+ void start() {
+-
+- boost::asio::async_read(socket_, boost::asio::buffer(data_),
+- boost::asio::transfer_exactly(544),
++ if(true) {
++ serial_handing_request();
++ } else {
++ parallel_handing_request();
++ }
++ }
++ // flow:
++ //
++ // recv request --> process request --> reply ack
++ // | |
++ // --------------<-------------------------
++ void serial_handing_request() {
++ boost::asio::async_read(m_dm_socket, boost::asio::buffer(m_buffer, RBDSC_MSG_LEN),
++ boost::asio::transfer_exactly(RBDSC_MSG_LEN),
+ boost::bind(&session::handle_read,
+- shared_from_this(),
+- boost::asio::placeholders::error,
+- boost::asio::placeholders::bytes_transferred));
++ shared_from_this(),
++ boost::asio::placeholders::error,
++ boost::asio::placeholders::bytes_transferred));
++ }
+
++ // flow :
++ //
++ // --> thread 1: process request
++ // recv request --> thread 2: process request --> reply ack
++ // --> thread n: process request
++ //
++ void parallel_handing_request() {
++ // TODO
+ }
+
++private:
++
+ void handle_read(const boost::system::error_code& error, size_t bytes_transferred) {
++ // when recv eof, the most proble is that client side close socket.
++ // so, server side need to end handing_request
++ if(error == boost::asio::error::eof) {
++ std::cout<<"session: async_read : " << error.message() << std::endl;
++ return;
++ }
+
+- if (!error) {
+- if(bytes_transferred != 544){
+- assert(0);
+- }
+- process_msg(session_id, std::string(data_, bytes_transferred));
++ if(error) {
++ std::cout<<"session: async_read fails: " << error.message() << std::endl;
++ assert(0);
++ }
+
++ if(bytes_transferred != RBDSC_MSG_LEN) {
++ std::cout<<"session : request in-complete. "<<std::endl;
++ assert(0);
+ }
++
++ // TODO async_process can increse coding readable.
++ // process_msg_callback call handle async_send
++ process_msg(m_session_id, std::string(m_buffer, bytes_transferred));
+ }
+
+- void handle_write(const boost::system::error_code& error) {
+- if (!error) {
+- boost::asio::async_read(socket_, boost::asio::buffer(data_),
+- boost::asio::transfer_exactly(544),
+- boost::bind(&session::handle_read,
+- shared_from_this(),
+- boost::asio::placeholders::error,
+- boost::asio::placeholders::bytes_transferred));
++ void handle_write(const boost::system::error_code& error, size_t bytes_transferred) {
++ if (error) {
++ std::cout<<"session: async_write fails: " << error.message() << std::endl;
++ assert(0);
+ }
++
++ if(bytes_transferred != RBDSC_MSG_LEN) {
++ std::cout<<"session : reply in-complete. "<<std::endl;
++ assert(0);
++ }
++
++ boost::asio::async_read(m_dm_socket, boost::asio::buffer(m_buffer),
++ boost::asio::transfer_exactly(RBDSC_MSG_LEN),
++ boost::bind(&session::handle_read,
++ shared_from_this(),
++ boost::asio::placeholders::error,
++ boost::asio::placeholders::bytes_transferred));
++
+ }
+
++public:
+ void send(std::string msg) {
+-
+- boost::asio::async_write(socket_,
++ boost::asio::async_write(m_dm_socket,
+ boost::asio::buffer(msg.c_str(), msg.size()),
+- boost::asio::transfer_exactly(544),
++ boost::asio::transfer_exactly(RBDSC_MSG_LEN),
+ boost::bind(&session::handle_write,
+- shared_from_this(),
+- boost::asio::placeholders::error));
++ shared_from_this(),
++ boost::asio::placeholders::error,
++ boost::asio::placeholders::bytes_transferred));
+
+ }
+
+ private:
+- uint64_t session_id;
+- stream_protocol::socket socket_;
++ uint64_t m_session_id;
++ stream_protocol::socket m_dm_socket;
+ ProcessMsg process_msg;
+
+ // Buffer used to store data received from the client.
+ //std::array<char, 1024> data_;
+- char data_[1024];
++ char m_buffer[1024];
+ };
+
+ typedef std::shared_ptr<session> session_ptr;
+
+ class CacheServer {
+ public:
+- CacheServer(boost::asio::io_service& io_service,
+- const std::string& file, ProcessMsg processmsg)
+- : io_service_(io_service),
+- server_process_msg(processmsg),
+- acceptor_(io_service, stream_protocol::endpoint(file))
+- {
+- session_ptr new_session(new session(session_id, io_service_, server_process_msg));
+- acceptor_.async_accept(new_session->socket(),
+- boost::bind(&CacheServer::handle_accept, this, new_session,
+- boost::asio::placeholders::error));
+- }
+-
+- void handle_accept(session_ptr new_session,
+- const boost::system::error_code& error)
+- {
+- //TODO(): open librbd snap
+- if (!error) {
+- new_session->start();
+- session_map.emplace(session_id, new_session);
+- session_id++;
+- new_session.reset(new session(session_id, io_service_, server_process_msg));
+- acceptor_.async_accept(new_session->socket(),
+- boost::bind(&CacheServer::handle_accept, this, new_session,
+- boost::asio::placeholders::error));
++ CacheServer(const std::string& file, ProcessMsg processmsg, CephContext* cct)
++ : m_cct(cct), m_server_process_msg(processmsg),
++ m_local_path(file),
++ m_acceptor(m_io_service)
++ {}
++
++ void run() {
++ bool ret;
++ ret = start_accept();
++ if(!ret) {
++ return;
+ }
++ m_io_service.run();
+ }
+
++ // TODO : use callback to replace this function.
+ void send(uint64_t session_id, std::string msg) {
+- auto it = session_map.find(session_id);
+- if (it != session_map.end()) {
++ auto it = m_session_map.find(session_id);
++ if (it != m_session_map.end()) {
+ it->second->send(msg);
++ } else {
++ // TODO : why don't find existing session id ?
++ std::cout<<"don't find session id..."<<std::endl;
++ assert(0);
++ }
++ }
++
++private:
++ // when creating one acceptor, can control every step in this way.
++ bool start_accept() {
++ boost::system::error_code ec;
++ m_acceptor.open(m_local_path.protocol(), ec);
++ if(ec) {
++ std::cout << "m_acceptor open fails: " << ec.message() << std::endl;
++ return false;
++ }
++
++ // TODO control acceptor attribute.
++
++ m_acceptor.bind(m_local_path, ec);
++ if(ec) {
++ std::cout << "m_acceptor bind fails: " << ec.message() << std::endl;
++ return false;
++ }
++
++ m_acceptor.listen(boost::asio::socket_base::max_connections, ec);
++ if(ec) {
++ std::cout << "m_acceptor listen fails: " << ec.message() << std::endl;
++ return false;
+ }
++
++ accept();
++ return true;
++ }
++
++ void accept() {
++ session_ptr new_session(new session(m_session_id, m_io_service, m_server_process_msg));
++ m_acceptor.async_accept(new_session->socket(),
++ boost::bind(&CacheServer::handle_accept, this, new_session,
++ boost::asio::placeholders::error));
++ }
++
++ void handle_accept(session_ptr new_session, const boost::system::error_code& error) {
++ //TODO(): open librbd snap ... yuan
++
++ if(error) {
++ std::cout << "async accept fails : " << error.message() << std::endl;
++ assert(0); // TODO
++ }
++
++ // must put session into m_session_map at the front of session.start()
++ m_session_map.emplace(m_session_id, new_session);
++ // TODO : session setting
++ new_session->start();
++ m_session_id++;
++
++ // lanuch next accept
++ accept();
+ }
+
+ private:
+- boost::asio::io_service& io_service_;
+- ProcessMsg server_process_msg;
+- stream_protocol::acceptor acceptor_;
+- uint64_t session_id = 1;
+- std::map<uint64_t, session_ptr> session_map;
++ CephContext* m_cct;
++ boost::asio::io_service m_io_service; // TODO wrapper it.
++ ProcessMsg m_server_process_msg;
++ stream_protocol::endpoint m_local_path;
++ stream_protocol::acceptor m_acceptor;
++ uint64_t m_session_id = 1;
++ std::map<uint64_t, session_ptr> m_session_map;
+ };
+
+ } // namespace cache
+diff --git a/src/tools/rbd_cache/CacheControllerSocketClient.hpp b/src/tools/rbd_cache/CacheControllerSocketClient.hpp
+index 3b0ca00..964f888 100644
+--- a/src/tools/rbd_cache/CacheControllerSocketClient.hpp
++++ b/src/tools/rbd_cache/CacheControllerSocketClient.hpp
+@@ -4,9 +4,12 @@
+ #ifndef CACHE_CONTROLLER_SOCKET_CLIENT_H
+ #define CACHE_CONTROLLER_SOCKET_CLIENT_H
+
++#include <atomic>
+ #include <boost/asio.hpp>
+ #include <boost/bind.hpp>
++#include <boost/asio/error.hpp>
+ #include <boost/algorithm/string.hpp>
++#include "librbd/ImageCtx.h"
+ #include "include/assert.h"
+ #include "include/Context.h"
+ #include "CacheControllerSocketCommon.h"
+@@ -19,32 +22,64 @@ namespace cache {
+
+ class CacheClient {
+ public:
+- CacheClient(boost::asio::io_service& io_service,
+- const std::string& file, ClientProcessMsg processmsg)
+- : io_service_(io_service),
+- io_service_work_(io_service),
+- socket_(io_service),
++ CacheClient(const std::string& file, ClientProcessMsg processmsg, CephContext* ceph_ctx)
++ : m_io_service_work(m_io_service),
++ m_dm_socket(m_io_service),
+ m_client_process_msg(processmsg),
+- ep_(stream_protocol::endpoint(file))
++ m_ep(stream_protocol::endpoint(file)),
++ m_session_work(false),
++ cct(ceph_ctx)
+ {
+- io_thread.reset(new std::thread([this](){io_service_.run(); }));
++ // TODO wrapper io_service
++ std::thread thd([this](){
++ m_io_service.run();});
++ thd.detach();
+ }
+
+- ~CacheClient() {
+- io_service_.stop();
+- io_thread->join();
++ void run(){
+ }
+
+- void run(){
+- }
++ bool is_session_work() {
++ return m_session_work.load() == true;
++ }
++
++ // just when error occur, call this method.
++ void close() {
++ m_session_work.store(false);
++ boost::system::error_code close_ec;
++ m_dm_socket.close(close_ec);
++ if(close_ec) {
++ std::cout << "close: " << close_ec.message() << std::endl;
++ }
++ std::cout << "session don't work, later all request will be dispatched to rados layer" << std::endl;
++ }
+
+ int connect() {
+- try {
+- socket_.connect(ep_);
+- } catch (std::exception& e) {
++ boost::system::error_code ec;
++ m_dm_socket.connect(m_ep, ec);
++ if(ec) {
++ if(ec == boost::asio::error::connection_refused) {
++ std::cout << ec.message() << " : maybe rbd-cache Controller don't startup. "
++ << "Now data will be read from ceph cluster " << std::endl;
++ } else {
++ std::cout << "connect: " << ec.message() << std::endl;
++ }
++
++ if(m_dm_socket.is_open()) {
++ // Set to indicate what error occurred, if any.
++ // Note that, even if the function indicates an error,
++ // the underlying descriptor is closed.
++ boost::system::error_code close_ec;
++ m_dm_socket.close(close_ec);
++ if(close_ec) {
++ std::cout << "close: " << close_ec.message() << std::endl;
++ }
++ }
+ return -1;
+ }
+- connected = true;
++
++ std::cout<<"connect success"<<std::endl;
++
+ return 0;
+ }
+
+@@ -57,27 +92,51 @@ public:
+ message->vol_size = vol_size;
+ message->offset = 0;
+ message->length = 0;
+- boost::asio::async_write(socket_, boost::asio::buffer((char*)message, message->size()),
+- [this, message](const boost::system::error_code& err, size_t cb) {
+- delete message;
+- if (!err) {
+- boost::asio::async_read(socket_, boost::asio::buffer(buffer_),
+- boost::asio::transfer_exactly(544),
+- [this](const boost::system::error_code& err, size_t cb) {
+- if (!err) {
+- m_client_process_msg(std::string(buffer_, cb));
+- } else {
+- return -1;
+- }
+- });
+- } else {
+- return -1;
+- }
+- });
++
++ uint64_t ret;
++ boost::system::error_code ec;
++
++ ret = boost::asio::write(m_dm_socket, boost::asio::buffer((char*)message, message->size()), ec);
++ if(ec) {
++ std::cout << "write fails : " << ec.message() << std::endl;
++ return -1;
++ }
++
++ if(ret != message->size()) {
++ std::cout << "write fails : ret != send_bytes "<< std::endl;
++ return -1;
++ }
++
++ // hard code TODO
++ ret = boost::asio::read(m_dm_socket, boost::asio::buffer(m_recv_buffer, RBDSC_MSG_LEN), ec);
++ if(ec == boost::asio::error::eof) {
++ std::cout<< "recv eof"<<std::endl;
++ return -1;
++ }
++
++ if(ec) {
++ std::cout << "write fails : " << ec.message() << std::endl;
++ return -1;
++ }
++
++ if(ret != RBDSC_MSG_LEN) {
++ std::cout << "write fails : ret != receive bytes " << std::endl;
++ return -1;
++ }
++
++ m_client_process_msg(std::string(m_recv_buffer, ret));
++
++ delete message;
++
++ std::cout << "register volume success" << std::endl;
++
++ // TODO
++ m_session_work.store(true);
+
+ return 0;
+ }
+
++ // if occur any error, we just return false. Then read from rados.
+ int lookup_object(std::string pool_name, std::string vol_name, std::string object_id, Context* on_finish) {
+ rbdsc_req_type_t *message = new rbdsc_req_type_t();
+ message->type = RBDSC_READ;
+@@ -87,59 +146,82 @@ public:
+ message->offset = 0;
+ message->length = 0;
+
+- boost::asio::async_write(socket_, boost::asio::buffer((char*)message, message->size()),
++ boost::asio::async_write(m_dm_socket,
++ boost::asio::buffer((char*)message, message->size()),
++ boost::asio::transfer_exactly(RBDSC_MSG_LEN),
+ [this, on_finish, message](const boost::system::error_code& err, size_t cb) {
+- delete message;
+- if (!err) {
++ delete message;
++ if(err) {
++ std::cout<< "lookup_object: async_write fails." << err.message() << std::endl;
++ close();
++ on_finish->complete(false);
++ return;
++ }
++ if(cb != RBDSC_MSG_LEN) {
++ std::cout<< "lookup_object: async_write fails. in-complete request" <<std::endl;
++ close();
++ on_finish->complete(false);
++ return;
++ }
+ get_result(on_finish);
+- } else {
+- return -1;
+- }
+ });
+
+ return 0;
+ }
+
+ void get_result(Context* on_finish) {
+- boost::asio::async_read(socket_, boost::asio::buffer(buffer_),
+- boost::asio::transfer_exactly(544),
++ boost::asio::async_read(m_dm_socket, boost::asio::buffer(m_recv_buffer, RBDSC_MSG_LEN),
++ boost::asio::transfer_exactly(RBDSC_MSG_LEN),
+ [this, on_finish](const boost::system::error_code& err, size_t cb) {
+- if (cb != 544) {
+- assert(0);
+- }
+- if (!err) {
+- rbdsc_req_type_t *io_ctx = (rbdsc_req_type_t*)(buffer_);
+- if (io_ctx->type == RBDSC_READ_REPLY) {
+- on_finish->complete(true);
+- return;
+- } else {
+- on_finish->complete(false);
+- return;
+- }
+- } else {
+- assert(0);
+- return on_finish->complete(false);
+- }
++ if(err == boost::asio::error::eof) {
++ std::cout<<"get_result: ack is EOF." << std::endl;
++ close();
++ on_finish->complete(false);
++ return;
++ }
++ if(err) {
++ std::cout<< "get_result: async_read fails:" << err.message() << std::endl;
++ close();
++ on_finish->complete(false); // TODO replace this assert with some metohds.
++ return;
++ }
++ if (cb != RBDSC_MSG_LEN) {
++ close();
++ std::cout << "get_result: in-complete ack." << std::endl;
++ on_finish->complete(false); // TODO: replace this assert with some methods.
++ }
++
++ rbdsc_req_type_t *io_ctx = (rbdsc_req_type_t*)(m_recv_buffer);
++
++ // TODO: re-occur yuan's bug
++ if(io_ctx->type == RBDSC_READ) {
++ std::cout << "get rbdsc_read... " << std::endl;
++ assert(0);
++ }
++
++ if (io_ctx->type == RBDSC_READ_REPLY) {
++ on_finish->complete(true);
++ return;
++ } else {
++ on_finish->complete(false);
++ return;
++ }
+ });
+ }
+
+-
+ private:
+- boost::asio::io_service& io_service_;
+- boost::asio::io_service::work io_service_work_;
+- stream_protocol::socket socket_;
+-
+- std::shared_ptr<std::thread> io_thread;
++ boost::asio::io_service m_io_service;
++ boost::asio::io_service::work m_io_service_work;
++ stream_protocol::socket m_dm_socket;
+ ClientProcessMsg m_client_process_msg;
+- stream_protocol::endpoint ep_;
+- char buffer_[1024];
+- int block_size_ = 1024;
+-
+- std::condition_variable cv;
+- std::mutex m;
+-
+-public:
+- bool connected = false;
++ stream_protocol::endpoint m_ep;
++ char m_recv_buffer[1024];
++
++ // atomic modfiy for this variable.
++ // thread 1 : asio callback thread modify it.
++ // thread 2 : librbd read it.
++ std::atomic<bool> m_session_work;
++ CephContext* cct;
+ };
+
+ } // namespace cache
+diff --git a/src/tools/rbd_cache/CacheControllerSocketCommon.h b/src/tools/rbd_cache/CacheControllerSocketCommon.h
+index e026ec8..e17529a 100644
+--- a/src/tools/rbd_cache/CacheControllerSocketCommon.h
++++ b/src/tools/rbd_cache/CacheControllerSocketCommon.h
+@@ -55,6 +55,8 @@ struct rbdsc_req_type_t {
+ }
+ };
+
++static const int RBDSC_MSG_LEN = sizeof(rbdsc_req_type_t);
++
+ } // namespace cache
+ } // namespace rbd
+ #endif
+--
+2.7.4
+