From 9f8ff821dfc98dfc3cdb557b736ce455a3ae6162 Mon Sep 17 00:00:00 2001 From: Yuan Zhou Date: Thu, 16 Aug 2018 17:28:46 +0800 Subject: [PATCH 08/10] librbd: implement async cache lookup and read Signed-off-by: Yuan Zhou --- .../SharedPersistentObjectCacherObjectDispatch.cc | 63 ++++++++++++---------- .../SharedPersistentObjectCacherObjectDispatch.h | 7 +++ src/tools/rbd_cache/CacheController.cc | 9 ++-- src/tools/rbd_cache/CacheControllerSocket.hpp | 8 ++- .../rbd_cache/CacheControllerSocketClient.hpp | 49 +++++++++-------- src/tools/rbd_cache/CacheControllerSocketCommon.h | 12 +++++ 6 files changed, 94 insertions(+), 54 deletions(-) diff --git a/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc b/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc index 2aa5cad..407ce49 100644 --- a/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc +++ b/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc @@ -29,13 +29,8 @@ SharedPersistentObjectCacherObjectDispatch::SharedPersistentObjectCacherObjec template SharedPersistentObjectCacherObjectDispatch::~SharedPersistentObjectCacherObjectDispatch() { - if (m_object_store) { delete m_object_store; - } - - if (m_cache_client) { delete m_cache_client; - } } template @@ -88,34 +83,48 @@ bool SharedPersistentObjectCacherObjectDispatch::read( ldout(cct, 20) << "object_no=" << object_no << " " << object_off << "~" << object_len << dendl; - // ensure we aren't holding the cache lock post-read on_dispatched = util::create_async_context_callback(*m_image_ctx, on_dispatched); + auto ctx = new FunctionContext([this, oid, object_off, object_len, read_data, dispatch_result, on_dispatched](bool cache) { + handle_read_cache(cache, oid, object_off, object_len, read_data, dispatch_result, on_dispatched); + }); if (m_cache_client && m_cache_client->connected && m_object_store) { - bool exists; m_cache_client->lookup_object(m_image_ctx->data_ctx.get_pool_name(), - m_image_ctx->id, oid, &exists); - - // try to read from parent image - ldout(cct, 20) << "SRO cache object exists:" << exists << dendl; - if (exists) { - int r = m_object_store->read_object(oid, read_data, object_off, object_len, on_dispatched); - if (r != 0) { - *dispatch_result = io::DISPATCH_RESULT_COMPLETE; - on_dispatched->complete(r); - return true; - } - } + m_image_ctx->id, oid, ctx); } - - ldout(cct, 20) << "Continue read from RADOS" << dendl; - *dispatch_result = io::DISPATCH_RESULT_CONTINUE; - on_dispatched->complete(0); return true; } template +int SharedPersistentObjectCacherObjectDispatch::handle_read_cache( + bool cache, + const std::string &oid, uint64_t object_off, uint64_t object_len, + ceph::bufferlist* read_data, io::DispatchResult* dispatch_result, + Context* on_dispatched) { + // IO chained in reverse order + auto cct = m_image_ctx->cct; + ldout(cct, 20) << dendl; + + // try to read from parent image + if (cache) { + int r = m_object_store->read_object(oid, read_data, object_off, object_len, on_dispatched); + if (r != 0) { + *dispatch_result = io::DISPATCH_RESULT_COMPLETE; + //TODO(): complete in syncfile + on_dispatched->complete(r); + ldout(cct, 20) << "AAAAcomplete=" << *dispatch_result <complete(0); + ldout(cct, 20) << "BBB no cache" << *dispatch_result < void SharedPersistentObjectCacherObjectDispatch::client_handle_request(std::string msg) { auto cct = m_image_ctx->cct; ldout(cct, 20) << dendl; @@ -123,26 +132,26 @@ void SharedPersistentObjectCacherObjectDispatch::client_handle_request(std::s rbd::cache::rbdsc_req_type_t *io_ctx = (rbd::cache::rbdsc_req_type_t*)(msg.c_str()); switch (io_ctx->type) { - case RBDSC_REGISTER_REPLY: { + case rbd::cache::RBDSC_REGISTER_REPLY: { // open cache handler for volume ldout(cct, 20) << "SRO cache client open cache handler" << dendl; m_object_store = new SharedPersistentObjectCacher(m_image_ctx, m_image_ctx->shared_cache_path); break; } - case RBDSC_READ_REPLY: { + case rbd::cache::RBDSC_READ_REPLY: { ldout(cct, 20) << "SRO cache client start to read cache" << dendl; //TODO(): should call read here break; } - case RBDSC_READ_RADOS: { + case rbd::cache::RBDSC_READ_RADOS: { ldout(cct, 20) << "SRO cache client start to read rados" << dendl; //TODO(): should call read here break; } - default: ldout(cct, 20) << "nothing" << dendl; + default: ldout(cct, 20) << "nothing" << io_ctx->type <init_cache(io_ctx->vol_name, io_ctx->vol_size); io_ctx->type = RBDSC_REGISTER_REPLY; - m_cache_server->send(sesstion_id, std::string((char*)io_ctx, msg.size())); + m_cache_server->send(session_id, std::string((char*)io_ctx, msg.size())); break; } @@ -98,7 +98,10 @@ void CacheController::handle_request(uint64_t sesstion_id, std::string msg){ } else { io_ctx->type = RBDSC_READ_REPLY; } - m_cache_server->send(sesstion_id, std::string((char*)io_ctx, msg.size())); + if (io_ctx->type != RBDSC_READ_REPLY) { + assert(0); + } + m_cache_server->send(session_id, std::string((char*)io_ctx, msg.size())); break; } diff --git a/src/tools/rbd_cache/CacheControllerSocket.hpp b/src/tools/rbd_cache/CacheControllerSocket.hpp index 967af1d..d178b58 100644 --- a/src/tools/rbd_cache/CacheControllerSocket.hpp +++ b/src/tools/rbd_cache/CacheControllerSocket.hpp @@ -43,7 +43,9 @@ public: void handle_read(const boost::system::error_code& error, size_t bytes_transferred) { if (!error) { - + if(bytes_transferred != 544){ + assert(0); + } process_msg(session_id, std::string(data_, bytes_transferred)); } @@ -51,7 +53,8 @@ public: void handle_write(const boost::system::error_code& error) { if (!error) { - socket_.async_read_some(boost::asio::buffer(data_), + boost::asio::async_read(socket_, boost::asio::buffer(data_), + boost::asio::transfer_exactly(544), boost::bind(&session::handle_read, shared_from_this(), boost::asio::placeholders::error, @@ -63,6 +66,7 @@ public: boost::asio::async_write(socket_, boost::asio::buffer(msg.c_str(), msg.size()), + boost::asio::transfer_exactly(544), boost::bind(&session::handle_write, shared_from_this(), boost::asio::placeholders::error)); diff --git a/src/tools/rbd_cache/CacheControllerSocketClient.hpp b/src/tools/rbd_cache/CacheControllerSocketClient.hpp index 56b79ce..3b0ca00 100644 --- a/src/tools/rbd_cache/CacheControllerSocketClient.hpp +++ b/src/tools/rbd_cache/CacheControllerSocketClient.hpp @@ -8,6 +8,7 @@ #include #include #include "include/assert.h" +#include "include/Context.h" #include "CacheControllerSocketCommon.h" @@ -26,8 +27,12 @@ public: m_client_process_msg(processmsg), ep_(stream_protocol::endpoint(file)) { - std::thread thd([this](){io_service_.run(); }); - thd.detach(); + io_thread.reset(new std::thread([this](){io_service_.run(); })); + } + + ~CacheClient() { + io_service_.stop(); + io_thread->join(); } void run(){ @@ -53,7 +58,8 @@ public: message->offset = 0; message->length = 0; boost::asio::async_write(socket_, boost::asio::buffer((char*)message, message->size()), - [this](const boost::system::error_code& err, size_t cb) { + [this, message](const boost::system::error_code& err, size_t cb) { + delete message; if (!err) { boost::asio::async_read(socket_, boost::asio::buffer(buffer_), boost::asio::transfer_exactly(544), @@ -72,7 +78,7 @@ public: return 0; } - int lookup_object(std::string pool_name, std::string vol_name, std::string object_id, bool* result) { + int lookup_object(std::string pool_name, std::string vol_name, std::string object_id, Context* on_finish) { rbdsc_req_type_t *message = new rbdsc_req_type_t(); message->type = RBDSC_READ; memcpy(message->pool_name, pool_name.c_str(), pool_name.size()); @@ -82,49 +88,48 @@ public: message->length = 0; boost::asio::async_write(socket_, boost::asio::buffer((char*)message, message->size()), - [this, result](const boost::system::error_code& err, size_t cb) { + [this, on_finish, message](const boost::system::error_code& err, size_t cb) { + delete message; if (!err) { - get_result(result); + get_result(on_finish); } else { return -1; } }); - std::unique_lock lk(m); - //cv.wait(lk); - cv.wait_for(lk, std::chrono::milliseconds(100)); + return 0; } - void get_result(bool* result) { + void get_result(Context* on_finish) { boost::asio::async_read(socket_, boost::asio::buffer(buffer_), boost::asio::transfer_exactly(544), - [this, result](const boost::system::error_code& err, size_t cb) { + [this, on_finish](const boost::system::error_code& err, size_t cb) { + if (cb != 544) { + assert(0); + } if (!err) { rbdsc_req_type_t *io_ctx = (rbdsc_req_type_t*)(buffer_); if (io_ctx->type == RBDSC_READ_REPLY) { - *result = true; + on_finish->complete(true); + return; } else { - *result = false; + on_finish->complete(false); + return; } - cv.notify_one(); - m_client_process_msg(std::string(buffer_, cb)); } else { - return -1; + assert(0); + return on_finish->complete(false); } }); } - void handle_connect(const boost::system::error_code& error) { - //TODO(): open librbd snap - } - - void handle_write(const boost::system::error_code& error) { - } private: boost::asio::io_service& io_service_; boost::asio::io_service::work io_service_work_; stream_protocol::socket socket_; + + std::shared_ptr io_thread; ClientProcessMsg m_client_process_msg; stream_protocol::endpoint ep_; char buffer_[1024]; diff --git a/src/tools/rbd_cache/CacheControllerSocketCommon.h b/src/tools/rbd_cache/CacheControllerSocketCommon.h index a9d73a8..e026ec8 100644 --- a/src/tools/rbd_cache/CacheControllerSocketCommon.h +++ b/src/tools/rbd_cache/CacheControllerSocketCommon.h @@ -4,6 +4,7 @@ #ifndef CACHE_CONTROLLER_SOCKET_COMMON_H #define CACHE_CONTROLLER_SOCKET_COMMON_H +/* #define RBDSC_REGISTER 0X11 #define RBDSC_READ 0X12 #define RBDSC_LOOKUP 0X13 @@ -11,10 +12,21 @@ #define RBDSC_READ_REPLY 0X15 #define RBDSC_LOOKUP_REPLY 0X16 #define RBDSC_READ_RADOS 0X17 +*/ namespace rbd { namespace cache { +static const int RBDSC_REGISTER = 0X11; +static const int RBDSC_READ = 0X12; +static const int RBDSC_LOOKUP = 0X13; +static const int RBDSC_REGISTER_REPLY = 0X14; +static const int RBDSC_READ_REPLY = 0X15; +static const int RBDSC_LOOKUP_REPLY = 0X16; +static const int RBDSC_READ_RADOS = 0X17; + + + typedef std::function ProcessMsg; typedef std::function ClientProcessMsg; typedef uint8_t rbdsc_req_type; -- 2.7.4