summaryrefslogtreecommitdiffstats
path: root/src/ceph/0009-librbd-clean-up-on-rbd-shared-cache.patch
diff options
context:
space:
mode:
Diffstat (limited to 'src/ceph/0009-librbd-clean-up-on-rbd-shared-cache.patch')
-rw-r--r--src/ceph/0009-librbd-clean-up-on-rbd-shared-cache.patch767
1 files changed, 0 insertions, 767 deletions
diff --git a/src/ceph/0009-librbd-clean-up-on-rbd-shared-cache.patch b/src/ceph/0009-librbd-clean-up-on-rbd-shared-cache.patch
deleted file mode 100644
index 4adec93..0000000
--- a/src/ceph/0009-librbd-clean-up-on-rbd-shared-cache.patch
+++ /dev/null
@@ -1,767 +0,0 @@
-From 55b29a71c238ac465d05035a51808a3b616a8f46 Mon Sep 17 00:00:00 2001
-From: Yuan Zhou <yuan.zhou@intel.com>
-Date: Wed, 5 Sep 2018 14:40:54 +0800
-Subject: [PATCH 09/10] librbd: clean up on rbd shared cache
-
-Signed-off-by: Dehao Shang <dehao.shang@intel.com>
-Signed-off-by: Yuan Zhou <yuan.zhou@intel.com>
----
- src/common/options.cc | 4 +
- .../SharedPersistentObjectCacherObjectDispatch.cc | 28 ++-
- .../SharedPersistentObjectCacherObjectDispatch.h | 3 +-
- src/tools/rbd_cache/CacheController.cc | 11 +-
- src/tools/rbd_cache/CacheController.h | 1 -
- src/tools/rbd_cache/CacheControllerSocket.hpp | 213 +++++++++++++------
- .../rbd_cache/CacheControllerSocketClient.hpp | 226 ++++++++++++++-------
- src/tools/rbd_cache/CacheControllerSocketCommon.h | 2 +
- 8 files changed, 340 insertions(+), 148 deletions(-)
-
-diff --git a/src/common/options.cc b/src/common/options.cc
-index b334c1e..3172744 100644
---- a/src/common/options.cc
-+++ b/src/common/options.cc
-@@ -6365,6 +6365,10 @@ static std::vector<Option> get_rbd_options() {
- .set_default("/tmp")
- .set_description("shared ssd caching data dir"),
-
-+ Option("rbd_shared_cache_sock", Option::TYPE_STR, Option::LEVEL_ADVANCED)
-+ .set_default("/tmp/rbd_shared_ro_cache_sock")
-+ .set_description("shared ssd caching domain socket"),
-+
- Option("rbd_shared_cache_entries", Option::TYPE_INT, Option::LEVEL_ADVANCED)
- .set_default(4096)
- .set_description("shared ssd caching data entries"),
-diff --git a/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc b/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc
-index 407ce49..7cbc019 100644
---- a/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc
-+++ b/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc
-@@ -33,6 +33,7 @@ SharedPersistentObjectCacherObjectDispatch<I>::~SharedPersistentObjectCacherObje
- delete m_cache_client;
- }
-
-+// TODO if connect fails, init will return error to high layer.
- template <typename I>
- void SharedPersistentObjectCacherObjectDispatch<I>::init() {
- auto cct = m_image_ctx->cct;
-@@ -44,11 +45,11 @@ void SharedPersistentObjectCacherObjectDispatch<I>::init() {
- return;
- }
-
-- ldout(cct, 20) << "parent image: setup SRO cache client = " << dendl;
-+ ldout(cct, 5) << "parent image: setup SRO cache client = " << dendl;
-
-- std::string controller_path = "/tmp/rbd_shared_readonly_cache_demo";
-- m_cache_client = new rbd::cache::CacheClient(io_service, controller_path.c_str(),
-- ([&](std::string s){client_handle_request(s);}));
-+ std::string controller_path = ((CephContext*)cct)->_conf.get_val<std::string>("rbd_shared_cache_sock");
-+ m_cache_client = new rbd::cache::CacheClient(controller_path.c_str(),
-+ ([&](std::string s){client_handle_request(s);}), m_image_ctx->cct);
-
- int ret = m_cache_client->connect();
- if (ret < 0) {
-@@ -78,18 +79,29 @@ bool SharedPersistentObjectCacherObjectDispatch<I>::read(
- io::ExtentMap* extent_map, int* object_dispatch_flags,
- io::DispatchResult* dispatch_result, Context** on_finish,
- Context* on_dispatched) {
-+
- // IO chained in reverse order
-+
-+ // Now, policy is : when session have any error, later all read will dispatched to rados layer.
-+ if(!m_cache_client->is_session_work()) {
-+ *dispatch_result = io::DISPATCH_RESULT_CONTINUE;
-+ on_dispatched->complete(0);
-+ return true;
-+ // TODO : domain socket have error, all read operation will dispatched to rados layer.
-+ }
-+
- auto cct = m_image_ctx->cct;
- ldout(cct, 20) << "object_no=" << object_no << " " << object_off << "~"
- << object_len << dendl;
-
-+
- on_dispatched = util::create_async_context_callback(*m_image_ctx,
- on_dispatched);
- auto ctx = new FunctionContext([this, oid, object_off, object_len, read_data, dispatch_result, on_dispatched](bool cache) {
- handle_read_cache(cache, oid, object_off, object_len, read_data, dispatch_result, on_dispatched);
- });
-
-- if (m_cache_client && m_cache_client->connected && m_object_store) {
-+ if (m_cache_client && m_cache_client->is_session_work() && m_object_store) {
- m_cache_client->lookup_object(m_image_ctx->data_ctx.get_pool_name(),
- m_image_ctx->id, oid, ctx);
- }
-@@ -109,6 +121,7 @@ int SharedPersistentObjectCacherObjectDispatch<I>::handle_read_cache(
- // try to read from parent image
- if (cache) {
- int r = m_object_store->read_object(oid, read_data, object_off, object_len, on_dispatched);
-+ //int r = object_len;
- if (r != 0) {
- *dispatch_result = io::DISPATCH_RESULT_COMPLETE;
- //TODO(): complete in syncfile
-@@ -123,7 +136,6 @@ int SharedPersistentObjectCacherObjectDispatch<I>::handle_read_cache(
- return false;
- }
- }
--
- template <typename I>
- void SharedPersistentObjectCacherObjectDispatch<I>::client_handle_request(std::string msg) {
- auto cct = m_image_ctx->cct;
-@@ -133,7 +145,7 @@ void SharedPersistentObjectCacherObjectDispatch<I>::client_handle_request(std::s
-
- switch (io_ctx->type) {
- case rbd::cache::RBDSC_REGISTER_REPLY: {
-- // open cache handler for volume
-+ // open cache handler for volume
- ldout(cct, 20) << "SRO cache client open cache handler" << dendl;
- m_object_store = new SharedPersistentObjectCacher<I>(m_image_ctx, m_image_ctx->shared_cache_path);
-
-@@ -153,7 +165,7 @@ void SharedPersistentObjectCacherObjectDispatch<I>::client_handle_request(std::s
- }
- default: ldout(cct, 20) << "nothing" << io_ctx->type <<dendl;
- break;
--
-+
- }
- }
-
-diff --git a/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.h b/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.h
-index 36b868a..5685244 100644
---- a/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.h
-+++ b/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.h
-@@ -118,12 +118,11 @@ private:
- uint64_t object_len, ceph::bufferlist* read_data,
- io::DispatchResult* dispatch_result,
- Context* on_dispatched);
-+ void client_handle_request(std::string msg);
-
- ImageCtxT* m_image_ctx;
-
-- void client_handle_request(std::string msg);
- rbd::cache::CacheClient *m_cache_client = nullptr;
-- boost::asio::io_service io_service;
- };
-
- } // namespace cache
-diff --git a/src/tools/rbd_cache/CacheController.cc b/src/tools/rbd_cache/CacheController.cc
-index c9d674b..620192c 100644
---- a/src/tools/rbd_cache/CacheController.cc
-+++ b/src/tools/rbd_cache/CacheController.cc
-@@ -65,12 +65,12 @@ void CacheController::handle_signal(int signum){}
- void CacheController::run() {
- try {
- //TODO(): use new socket path
-- std::string controller_path = "/tmp/rbd_shared_readonly_cache_demo";
-+ std::string controller_path = m_cct->_conf.get_val<std::string>("rbd_shared_cache_sock");
- std::remove(controller_path.c_str());
-
-- m_cache_server = new CacheServer(io_service, controller_path,
-- ([&](uint64_t p, std::string s){handle_request(p, s);}));
-- io_service.run();
-+ m_cache_server = new CacheServer(controller_path,
-+ ([&](uint64_t p, std::string s){handle_request(p, s);}), m_cct);
-+ m_cache_server->run();
- } catch (std::exception& e) {
- std::cerr << "Exception: " << e.what() << "\n";
- }
-@@ -105,7 +105,8 @@ void CacheController::handle_request(uint64_t session_id, std::string msg){
-
- break;
- }
--
-+ std::cout<<"can't recongize request"<<std::endl;
-+ assert(0); // TODO replace it.
- }
- }
-
-diff --git a/src/tools/rbd_cache/CacheController.h b/src/tools/rbd_cache/CacheController.h
-index 0e3abc1..0e23484 100644
---- a/src/tools/rbd_cache/CacheController.h
-+++ b/src/tools/rbd_cache/CacheController.h
-@@ -41,7 +41,6 @@ class CacheController {
- void handle_request(uint64_t sesstion_id, std::string msg);
-
- private:
-- boost::asio::io_service io_service;
- CacheServer *m_cache_server;
- std::vector<const char*> m_args;
- CephContext *m_cct;
-diff --git a/src/tools/rbd_cache/CacheControllerSocket.hpp b/src/tools/rbd_cache/CacheControllerSocket.hpp
-index d178b58..2ff7477 100644
---- a/src/tools/rbd_cache/CacheControllerSocket.hpp
-+++ b/src/tools/rbd_cache/CacheControllerSocket.hpp
-@@ -11,6 +11,7 @@
- #include <string>
- #include <boost/bind.hpp>
- #include <boost/asio.hpp>
-+#include <boost/asio/error.hpp>
- #include <boost/algorithm/string.hpp>
- #include "CacheControllerSocketCommon.h"
-
-@@ -23,110 +24,202 @@ namespace cache {
- class session : public std::enable_shared_from_this<session> {
- public:
- session(uint64_t session_id, boost::asio::io_service& io_service, ProcessMsg processmsg)
-- : session_id(session_id), socket_(io_service), process_msg(processmsg) {}
-+ : m_session_id(session_id), m_dm_socket(io_service), process_msg(processmsg) {}
-
- stream_protocol::socket& socket() {
-- return socket_;
-+ return m_dm_socket;
- }
-
- void start() {
--
-- boost::asio::async_read(socket_, boost::asio::buffer(data_),
-- boost::asio::transfer_exactly(544),
-+ if(true) {
-+ serial_handing_request();
-+ } else {
-+ parallel_handing_request();
-+ }
-+ }
-+ // flow:
-+ //
-+ // recv request --> process request --> reply ack
-+ // | |
-+ // --------------<-------------------------
-+ void serial_handing_request() {
-+ boost::asio::async_read(m_dm_socket, boost::asio::buffer(m_buffer, RBDSC_MSG_LEN),
-+ boost::asio::transfer_exactly(RBDSC_MSG_LEN),
- boost::bind(&session::handle_read,
-- shared_from_this(),
-- boost::asio::placeholders::error,
-- boost::asio::placeholders::bytes_transferred));
-+ shared_from_this(),
-+ boost::asio::placeholders::error,
-+ boost::asio::placeholders::bytes_transferred));
-+ }
-
-+ // flow :
-+ //
-+ // --> thread 1: process request
-+ // recv request --> thread 2: process request --> reply ack
-+ // --> thread n: process request
-+ //
-+ void parallel_handing_request() {
-+ // TODO
- }
-
-+private:
-+
- void handle_read(const boost::system::error_code& error, size_t bytes_transferred) {
-+ // when recv eof, the most proble is that client side close socket.
-+ // so, server side need to end handing_request
-+ if(error == boost::asio::error::eof) {
-+ std::cout<<"session: async_read : " << error.message() << std::endl;
-+ return;
-+ }
-
-- if (!error) {
-- if(bytes_transferred != 544){
-- assert(0);
-- }
-- process_msg(session_id, std::string(data_, bytes_transferred));
-+ if(error) {
-+ std::cout<<"session: async_read fails: " << error.message() << std::endl;
-+ assert(0);
-+ }
-
-+ if(bytes_transferred != RBDSC_MSG_LEN) {
-+ std::cout<<"session : request in-complete. "<<std::endl;
-+ assert(0);
- }
-+
-+ // TODO async_process can increse coding readable.
-+ // process_msg_callback call handle async_send
-+ process_msg(m_session_id, std::string(m_buffer, bytes_transferred));
- }
-
-- void handle_write(const boost::system::error_code& error) {
-- if (!error) {
-- boost::asio::async_read(socket_, boost::asio::buffer(data_),
-- boost::asio::transfer_exactly(544),
-- boost::bind(&session::handle_read,
-- shared_from_this(),
-- boost::asio::placeholders::error,
-- boost::asio::placeholders::bytes_transferred));
-+ void handle_write(const boost::system::error_code& error, size_t bytes_transferred) {
-+ if (error) {
-+ std::cout<<"session: async_write fails: " << error.message() << std::endl;
-+ assert(0);
- }
-+
-+ if(bytes_transferred != RBDSC_MSG_LEN) {
-+ std::cout<<"session : reply in-complete. "<<std::endl;
-+ assert(0);
-+ }
-+
-+ boost::asio::async_read(m_dm_socket, boost::asio::buffer(m_buffer),
-+ boost::asio::transfer_exactly(RBDSC_MSG_LEN),
-+ boost::bind(&session::handle_read,
-+ shared_from_this(),
-+ boost::asio::placeholders::error,
-+ boost::asio::placeholders::bytes_transferred));
-+
- }
-
-+public:
- void send(std::string msg) {
--
-- boost::asio::async_write(socket_,
-+ boost::asio::async_write(m_dm_socket,
- boost::asio::buffer(msg.c_str(), msg.size()),
-- boost::asio::transfer_exactly(544),
-+ boost::asio::transfer_exactly(RBDSC_MSG_LEN),
- boost::bind(&session::handle_write,
-- shared_from_this(),
-- boost::asio::placeholders::error));
-+ shared_from_this(),
-+ boost::asio::placeholders::error,
-+ boost::asio::placeholders::bytes_transferred));
-
- }
-
- private:
-- uint64_t session_id;
-- stream_protocol::socket socket_;
-+ uint64_t m_session_id;
-+ stream_protocol::socket m_dm_socket;
- ProcessMsg process_msg;
-
- // Buffer used to store data received from the client.
- //std::array<char, 1024> data_;
-- char data_[1024];
-+ char m_buffer[1024];
- };
-
- typedef std::shared_ptr<session> session_ptr;
-
- class CacheServer {
- public:
-- CacheServer(boost::asio::io_service& io_service,
-- const std::string& file, ProcessMsg processmsg)
-- : io_service_(io_service),
-- server_process_msg(processmsg),
-- acceptor_(io_service, stream_protocol::endpoint(file))
-- {
-- session_ptr new_session(new session(session_id, io_service_, server_process_msg));
-- acceptor_.async_accept(new_session->socket(),
-- boost::bind(&CacheServer::handle_accept, this, new_session,
-- boost::asio::placeholders::error));
-- }
--
-- void handle_accept(session_ptr new_session,
-- const boost::system::error_code& error)
-- {
-- //TODO(): open librbd snap
-- if (!error) {
-- new_session->start();
-- session_map.emplace(session_id, new_session);
-- session_id++;
-- new_session.reset(new session(session_id, io_service_, server_process_msg));
-- acceptor_.async_accept(new_session->socket(),
-- boost::bind(&CacheServer::handle_accept, this, new_session,
-- boost::asio::placeholders::error));
-+ CacheServer(const std::string& file, ProcessMsg processmsg, CephContext* cct)
-+ : m_cct(cct), m_server_process_msg(processmsg),
-+ m_local_path(file),
-+ m_acceptor(m_io_service)
-+ {}
-+
-+ void run() {
-+ bool ret;
-+ ret = start_accept();
-+ if(!ret) {
-+ return;
- }
-+ m_io_service.run();
- }
-
-+ // TODO : use callback to replace this function.
- void send(uint64_t session_id, std::string msg) {
-- auto it = session_map.find(session_id);
-- if (it != session_map.end()) {
-+ auto it = m_session_map.find(session_id);
-+ if (it != m_session_map.end()) {
- it->second->send(msg);
-+ } else {
-+ // TODO : why don't find existing session id ?
-+ std::cout<<"don't find session id..."<<std::endl;
-+ assert(0);
-+ }
-+ }
-+
-+private:
-+ // when creating one acceptor, can control every step in this way.
-+ bool start_accept() {
-+ boost::system::error_code ec;
-+ m_acceptor.open(m_local_path.protocol(), ec);
-+ if(ec) {
-+ std::cout << "m_acceptor open fails: " << ec.message() << std::endl;
-+ return false;
-+ }
-+
-+ // TODO control acceptor attribute.
-+
-+ m_acceptor.bind(m_local_path, ec);
-+ if(ec) {
-+ std::cout << "m_acceptor bind fails: " << ec.message() << std::endl;
-+ return false;
-+ }
-+
-+ m_acceptor.listen(boost::asio::socket_base::max_connections, ec);
-+ if(ec) {
-+ std::cout << "m_acceptor listen fails: " << ec.message() << std::endl;
-+ return false;
- }
-+
-+ accept();
-+ return true;
-+ }
-+
-+ void accept() {
-+ session_ptr new_session(new session(m_session_id, m_io_service, m_server_process_msg));
-+ m_acceptor.async_accept(new_session->socket(),
-+ boost::bind(&CacheServer::handle_accept, this, new_session,
-+ boost::asio::placeholders::error));
-+ }
-+
-+ void handle_accept(session_ptr new_session, const boost::system::error_code& error) {
-+ //TODO(): open librbd snap ... yuan
-+
-+ if(error) {
-+ std::cout << "async accept fails : " << error.message() << std::endl;
-+ assert(0); // TODO
-+ }
-+
-+ // must put session into m_session_map at the front of session.start()
-+ m_session_map.emplace(m_session_id, new_session);
-+ // TODO : session setting
-+ new_session->start();
-+ m_session_id++;
-+
-+ // lanuch next accept
-+ accept();
- }
-
- private:
-- boost::asio::io_service& io_service_;
-- ProcessMsg server_process_msg;
-- stream_protocol::acceptor acceptor_;
-- uint64_t session_id = 1;
-- std::map<uint64_t, session_ptr> session_map;
-+ CephContext* m_cct;
-+ boost::asio::io_service m_io_service; // TODO wrapper it.
-+ ProcessMsg m_server_process_msg;
-+ stream_protocol::endpoint m_local_path;
-+ stream_protocol::acceptor m_acceptor;
-+ uint64_t m_session_id = 1;
-+ std::map<uint64_t, session_ptr> m_session_map;
- };
-
- } // namespace cache
-diff --git a/src/tools/rbd_cache/CacheControllerSocketClient.hpp b/src/tools/rbd_cache/CacheControllerSocketClient.hpp
-index 3b0ca00..964f888 100644
---- a/src/tools/rbd_cache/CacheControllerSocketClient.hpp
-+++ b/src/tools/rbd_cache/CacheControllerSocketClient.hpp
-@@ -4,9 +4,12 @@
- #ifndef CACHE_CONTROLLER_SOCKET_CLIENT_H
- #define CACHE_CONTROLLER_SOCKET_CLIENT_H
-
-+#include <atomic>
- #include <boost/asio.hpp>
- #include <boost/bind.hpp>
-+#include <boost/asio/error.hpp>
- #include <boost/algorithm/string.hpp>
-+#include "librbd/ImageCtx.h"
- #include "include/assert.h"
- #include "include/Context.h"
- #include "CacheControllerSocketCommon.h"
-@@ -19,32 +22,64 @@ namespace cache {
-
- class CacheClient {
- public:
-- CacheClient(boost::asio::io_service& io_service,
-- const std::string& file, ClientProcessMsg processmsg)
-- : io_service_(io_service),
-- io_service_work_(io_service),
-- socket_(io_service),
-+ CacheClient(const std::string& file, ClientProcessMsg processmsg, CephContext* ceph_ctx)
-+ : m_io_service_work(m_io_service),
-+ m_dm_socket(m_io_service),
- m_client_process_msg(processmsg),
-- ep_(stream_protocol::endpoint(file))
-+ m_ep(stream_protocol::endpoint(file)),
-+ m_session_work(false),
-+ cct(ceph_ctx)
- {
-- io_thread.reset(new std::thread([this](){io_service_.run(); }));
-+ // TODO wrapper io_service
-+ std::thread thd([this](){
-+ m_io_service.run();});
-+ thd.detach();
- }
-
-- ~CacheClient() {
-- io_service_.stop();
-- io_thread->join();
-+ void run(){
- }
-
-- void run(){
-- }
-+ bool is_session_work() {
-+ return m_session_work.load() == true;
-+ }
-+
-+ // just when error occur, call this method.
-+ void close() {
-+ m_session_work.store(false);
-+ boost::system::error_code close_ec;
-+ m_dm_socket.close(close_ec);
-+ if(close_ec) {
-+ std::cout << "close: " << close_ec.message() << std::endl;
-+ }
-+ std::cout << "session don't work, later all request will be dispatched to rados layer" << std::endl;
-+ }
-
- int connect() {
-- try {
-- socket_.connect(ep_);
-- } catch (std::exception& e) {
-+ boost::system::error_code ec;
-+ m_dm_socket.connect(m_ep, ec);
-+ if(ec) {
-+ if(ec == boost::asio::error::connection_refused) {
-+ std::cout << ec.message() << " : maybe rbd-cache Controller don't startup. "
-+ << "Now data will be read from ceph cluster " << std::endl;
-+ } else {
-+ std::cout << "connect: " << ec.message() << std::endl;
-+ }
-+
-+ if(m_dm_socket.is_open()) {
-+ // Set to indicate what error occurred, if any.
-+ // Note that, even if the function indicates an error,
-+ // the underlying descriptor is closed.
-+ boost::system::error_code close_ec;
-+ m_dm_socket.close(close_ec);
-+ if(close_ec) {
-+ std::cout << "close: " << close_ec.message() << std::endl;
-+ }
-+ }
- return -1;
- }
-- connected = true;
-+
-+ std::cout<<"connect success"<<std::endl;
-+
- return 0;
- }
-
-@@ -57,27 +92,51 @@ public:
- message->vol_size = vol_size;
- message->offset = 0;
- message->length = 0;
-- boost::asio::async_write(socket_, boost::asio::buffer((char*)message, message->size()),
-- [this, message](const boost::system::error_code& err, size_t cb) {
-- delete message;
-- if (!err) {
-- boost::asio::async_read(socket_, boost::asio::buffer(buffer_),
-- boost::asio::transfer_exactly(544),
-- [this](const boost::system::error_code& err, size_t cb) {
-- if (!err) {
-- m_client_process_msg(std::string(buffer_, cb));
-- } else {
-- return -1;
-- }
-- });
-- } else {
-- return -1;
-- }
-- });
-+
-+ uint64_t ret;
-+ boost::system::error_code ec;
-+
-+ ret = boost::asio::write(m_dm_socket, boost::asio::buffer((char*)message, message->size()), ec);
-+ if(ec) {
-+ std::cout << "write fails : " << ec.message() << std::endl;
-+ return -1;
-+ }
-+
-+ if(ret != message->size()) {
-+ std::cout << "write fails : ret != send_bytes "<< std::endl;
-+ return -1;
-+ }
-+
-+ // hard code TODO
-+ ret = boost::asio::read(m_dm_socket, boost::asio::buffer(m_recv_buffer, RBDSC_MSG_LEN), ec);
-+ if(ec == boost::asio::error::eof) {
-+ std::cout<< "recv eof"<<std::endl;
-+ return -1;
-+ }
-+
-+ if(ec) {
-+ std::cout << "write fails : " << ec.message() << std::endl;
-+ return -1;
-+ }
-+
-+ if(ret != RBDSC_MSG_LEN) {
-+ std::cout << "write fails : ret != receive bytes " << std::endl;
-+ return -1;
-+ }
-+
-+ m_client_process_msg(std::string(m_recv_buffer, ret));
-+
-+ delete message;
-+
-+ std::cout << "register volume success" << std::endl;
-+
-+ // TODO
-+ m_session_work.store(true);
-
- return 0;
- }
-
-+ // if occur any error, we just return false. Then read from rados.
- int lookup_object(std::string pool_name, std::string vol_name, std::string object_id, Context* on_finish) {
- rbdsc_req_type_t *message = new rbdsc_req_type_t();
- message->type = RBDSC_READ;
-@@ -87,59 +146,82 @@ public:
- message->offset = 0;
- message->length = 0;
-
-- boost::asio::async_write(socket_, boost::asio::buffer((char*)message, message->size()),
-+ boost::asio::async_write(m_dm_socket,
-+ boost::asio::buffer((char*)message, message->size()),
-+ boost::asio::transfer_exactly(RBDSC_MSG_LEN),
- [this, on_finish, message](const boost::system::error_code& err, size_t cb) {
-- delete message;
-- if (!err) {
-+ delete message;
-+ if(err) {
-+ std::cout<< "lookup_object: async_write fails." << err.message() << std::endl;
-+ close();
-+ on_finish->complete(false);
-+ return;
-+ }
-+ if(cb != RBDSC_MSG_LEN) {
-+ std::cout<< "lookup_object: async_write fails. in-complete request" <<std::endl;
-+ close();
-+ on_finish->complete(false);
-+ return;
-+ }
- get_result(on_finish);
-- } else {
-- return -1;
-- }
- });
-
- return 0;
- }
-
- void get_result(Context* on_finish) {
-- boost::asio::async_read(socket_, boost::asio::buffer(buffer_),
-- boost::asio::transfer_exactly(544),
-+ boost::asio::async_read(m_dm_socket, boost::asio::buffer(m_recv_buffer, RBDSC_MSG_LEN),
-+ boost::asio::transfer_exactly(RBDSC_MSG_LEN),
- [this, on_finish](const boost::system::error_code& err, size_t cb) {
-- if (cb != 544) {
-- assert(0);
-- }
-- if (!err) {
-- rbdsc_req_type_t *io_ctx = (rbdsc_req_type_t*)(buffer_);
-- if (io_ctx->type == RBDSC_READ_REPLY) {
-- on_finish->complete(true);
-- return;
-- } else {
-- on_finish->complete(false);
-- return;
-- }
-- } else {
-- assert(0);
-- return on_finish->complete(false);
-- }
-+ if(err == boost::asio::error::eof) {
-+ std::cout<<"get_result: ack is EOF." << std::endl;
-+ close();
-+ on_finish->complete(false);
-+ return;
-+ }
-+ if(err) {
-+ std::cout<< "get_result: async_read fails:" << err.message() << std::endl;
-+ close();
-+ on_finish->complete(false); // TODO replace this assert with some metohds.
-+ return;
-+ }
-+ if (cb != RBDSC_MSG_LEN) {
-+ close();
-+ std::cout << "get_result: in-complete ack." << std::endl;
-+ on_finish->complete(false); // TODO: replace this assert with some methods.
-+ }
-+
-+ rbdsc_req_type_t *io_ctx = (rbdsc_req_type_t*)(m_recv_buffer);
-+
-+ // TODO: re-occur yuan's bug
-+ if(io_ctx->type == RBDSC_READ) {
-+ std::cout << "get rbdsc_read... " << std::endl;
-+ assert(0);
-+ }
-+
-+ if (io_ctx->type == RBDSC_READ_REPLY) {
-+ on_finish->complete(true);
-+ return;
-+ } else {
-+ on_finish->complete(false);
-+ return;
-+ }
- });
- }
-
--
- private:
-- boost::asio::io_service& io_service_;
-- boost::asio::io_service::work io_service_work_;
-- stream_protocol::socket socket_;
--
-- std::shared_ptr<std::thread> io_thread;
-+ boost::asio::io_service m_io_service;
-+ boost::asio::io_service::work m_io_service_work;
-+ stream_protocol::socket m_dm_socket;
- ClientProcessMsg m_client_process_msg;
-- stream_protocol::endpoint ep_;
-- char buffer_[1024];
-- int block_size_ = 1024;
--
-- std::condition_variable cv;
-- std::mutex m;
--
--public:
-- bool connected = false;
-+ stream_protocol::endpoint m_ep;
-+ char m_recv_buffer[1024];
-+
-+ // atomic modfiy for this variable.
-+ // thread 1 : asio callback thread modify it.
-+ // thread 2 : librbd read it.
-+ std::atomic<bool> m_session_work;
-+ CephContext* cct;
- };
-
- } // namespace cache
-diff --git a/src/tools/rbd_cache/CacheControllerSocketCommon.h b/src/tools/rbd_cache/CacheControllerSocketCommon.h
-index e026ec8..e17529a 100644
---- a/src/tools/rbd_cache/CacheControllerSocketCommon.h
-+++ b/src/tools/rbd_cache/CacheControllerSocketCommon.h
-@@ -55,6 +55,8 @@ struct rbdsc_req_type_t {
- }
- };
-
-+static const int RBDSC_MSG_LEN = sizeof(rbdsc_req_type_t);
-+
- } // namespace cache
- } // namespace rbd
- #endif
---
-2.7.4
-