summaryrefslogtreecommitdiffstats
path: root/src/ceph/0008-librbd-implement-async-cache-lookup-and-read.patch
diff options
context:
space:
mode:
Diffstat (limited to 'src/ceph/0008-librbd-implement-async-cache-lookup-and-read.patch')
-rw-r--r--src/ceph/0008-librbd-implement-async-cache-lookup-and-read.patch366
1 files changed, 366 insertions, 0 deletions
diff --git a/src/ceph/0008-librbd-implement-async-cache-lookup-and-read.patch b/src/ceph/0008-librbd-implement-async-cache-lookup-and-read.patch
new file mode 100644
index 0000000..16dd41f
--- /dev/null
+++ b/src/ceph/0008-librbd-implement-async-cache-lookup-and-read.patch
@@ -0,0 +1,366 @@
+From 9f8ff821dfc98dfc3cdb557b736ce455a3ae6162 Mon Sep 17 00:00:00 2001
+From: Yuan Zhou <yuan.zhou@intel.com>
+Date: Thu, 16 Aug 2018 17:28:46 +0800
+Subject: [PATCH 08/10] librbd: implement async cache lookup and read
+
+Signed-off-by: Yuan Zhou <yuan.zhou@intel.com>
+---
+ .../SharedPersistentObjectCacherObjectDispatch.cc | 63 ++++++++++++----------
+ .../SharedPersistentObjectCacherObjectDispatch.h | 7 +++
+ src/tools/rbd_cache/CacheController.cc | 9 ++--
+ src/tools/rbd_cache/CacheControllerSocket.hpp | 8 ++-
+ .../rbd_cache/CacheControllerSocketClient.hpp | 49 +++++++++--------
+ src/tools/rbd_cache/CacheControllerSocketCommon.h | 12 +++++
+ 6 files changed, 94 insertions(+), 54 deletions(-)
+
+diff --git a/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc b/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc
+index 2aa5cad..407ce49 100644
+--- a/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc
++++ b/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc
+@@ -29,13 +29,8 @@ SharedPersistentObjectCacherObjectDispatch<I>::SharedPersistentObjectCacherObjec
+
+ template <typename I>
+ SharedPersistentObjectCacherObjectDispatch<I>::~SharedPersistentObjectCacherObjectDispatch() {
+- if (m_object_store) {
+ delete m_object_store;
+- }
+-
+- if (m_cache_client) {
+ delete m_cache_client;
+- }
+ }
+
+ template <typename I>
+@@ -88,34 +83,48 @@ bool SharedPersistentObjectCacherObjectDispatch<I>::read(
+ ldout(cct, 20) << "object_no=" << object_no << " " << object_off << "~"
+ << object_len << dendl;
+
+- // ensure we aren't holding the cache lock post-read
+ on_dispatched = util::create_async_context_callback(*m_image_ctx,
+ on_dispatched);
++ auto ctx = new FunctionContext([this, oid, object_off, object_len, read_data, dispatch_result, on_dispatched](bool cache) {
++ handle_read_cache(cache, oid, object_off, object_len, read_data, dispatch_result, on_dispatched);
++ });
+
+ if (m_cache_client && m_cache_client->connected && m_object_store) {
+- bool exists;
+ m_cache_client->lookup_object(m_image_ctx->data_ctx.get_pool_name(),
+- m_image_ctx->id, oid, &exists);
+-
+- // try to read from parent image
+- ldout(cct, 20) << "SRO cache object exists:" << exists << dendl;
+- if (exists) {
+- int r = m_object_store->read_object(oid, read_data, object_off, object_len, on_dispatched);
+- if (r != 0) {
+- *dispatch_result = io::DISPATCH_RESULT_COMPLETE;
+- on_dispatched->complete(r);
+- return true;
+- }
+- }
++ m_image_ctx->id, oid, ctx);
+ }
+-
+- ldout(cct, 20) << "Continue read from RADOS" << dendl;
+- *dispatch_result = io::DISPATCH_RESULT_CONTINUE;
+- on_dispatched->complete(0);
+ return true;
+ }
+
+ template <typename I>
++int SharedPersistentObjectCacherObjectDispatch<I>::handle_read_cache(
++ bool cache,
++ const std::string &oid, uint64_t object_off, uint64_t object_len,
++ ceph::bufferlist* read_data, io::DispatchResult* dispatch_result,
++ Context* on_dispatched) {
++ // IO chained in reverse order
++ auto cct = m_image_ctx->cct;
++ ldout(cct, 20) << dendl;
++
++ // try to read from parent image
++ if (cache) {
++ int r = m_object_store->read_object(oid, read_data, object_off, object_len, on_dispatched);
++ if (r != 0) {
++ *dispatch_result = io::DISPATCH_RESULT_COMPLETE;
++ //TODO(): complete in syncfile
++ on_dispatched->complete(r);
++ ldout(cct, 20) << "AAAAcomplete=" << *dispatch_result <<dendl;
++ return true;
++ }
++ } else {
++ *dispatch_result = io::DISPATCH_RESULT_CONTINUE;
++ on_dispatched->complete(0);
++ ldout(cct, 20) << "BBB no cache" << *dispatch_result <<dendl;
++ return false;
++ }
++}
++
++template <typename I>
+ void SharedPersistentObjectCacherObjectDispatch<I>::client_handle_request(std::string msg) {
+ auto cct = m_image_ctx->cct;
+ ldout(cct, 20) << dendl;
+@@ -123,26 +132,26 @@ void SharedPersistentObjectCacherObjectDispatch<I>::client_handle_request(std::s
+ rbd::cache::rbdsc_req_type_t *io_ctx = (rbd::cache::rbdsc_req_type_t*)(msg.c_str());
+
+ switch (io_ctx->type) {
+- case RBDSC_REGISTER_REPLY: {
++ case rbd::cache::RBDSC_REGISTER_REPLY: {
+ // open cache handler for volume
+ ldout(cct, 20) << "SRO cache client open cache handler" << dendl;
+ m_object_store = new SharedPersistentObjectCacher<I>(m_image_ctx, m_image_ctx->shared_cache_path);
+
+ break;
+ }
+- case RBDSC_READ_REPLY: {
++ case rbd::cache::RBDSC_READ_REPLY: {
+ ldout(cct, 20) << "SRO cache client start to read cache" << dendl;
+ //TODO(): should call read here
+
+ break;
+ }
+- case RBDSC_READ_RADOS: {
++ case rbd::cache::RBDSC_READ_RADOS: {
+ ldout(cct, 20) << "SRO cache client start to read rados" << dendl;
+ //TODO(): should call read here
+
+ break;
+ }
+- default: ldout(cct, 20) << "nothing" << dendl;
++ default: ldout(cct, 20) << "nothing" << io_ctx->type <<dendl;
+ break;
+
+ }
+diff --git a/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.h b/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.h
+index 200688f..36b868a 100644
+--- a/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.h
++++ b/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.h
+@@ -112,6 +112,13 @@ public:
+
+ private:
+
++ int handle_read_cache(
++ bool cache,
++ const std::string &oid, uint64_t object_off,
++ uint64_t object_len, ceph::bufferlist* read_data,
++ io::DispatchResult* dispatch_result,
++ Context* on_dispatched);
++
+ ImageCtxT* m_image_ctx;
+
+ void client_handle_request(std::string msg);
+diff --git a/src/tools/rbd_cache/CacheController.cc b/src/tools/rbd_cache/CacheController.cc
+index cefcf28..c9d674b 100644
+--- a/src/tools/rbd_cache/CacheController.cc
++++ b/src/tools/rbd_cache/CacheController.cc
+@@ -76,7 +76,7 @@ void CacheController::run() {
+ }
+ }
+
+-void CacheController::handle_request(uint64_t sesstion_id, std::string msg){
++void CacheController::handle_request(uint64_t session_id, std::string msg){
+ rbdsc_req_type_t *io_ctx = (rbdsc_req_type_t*)(msg.c_str());
+
+ int ret = 0;
+@@ -86,7 +86,7 @@ void CacheController::handle_request(uint64_t sesstion_id, std::string msg){
+ // init cache layout for volume
+ m_object_cache_store->init_cache(io_ctx->vol_name, io_ctx->vol_size);
+ io_ctx->type = RBDSC_REGISTER_REPLY;
+- m_cache_server->send(sesstion_id, std::string((char*)io_ctx, msg.size()));
++ m_cache_server->send(session_id, std::string((char*)io_ctx, msg.size()));
+
+ break;
+ }
+@@ -98,7 +98,10 @@ void CacheController::handle_request(uint64_t sesstion_id, std::string msg){
+ } else {
+ io_ctx->type = RBDSC_READ_REPLY;
+ }
+- m_cache_server->send(sesstion_id, std::string((char*)io_ctx, msg.size()));
++ if (io_ctx->type != RBDSC_READ_REPLY) {
++ assert(0);
++ }
++ m_cache_server->send(session_id, std::string((char*)io_ctx, msg.size()));
+
+ break;
+ }
+diff --git a/src/tools/rbd_cache/CacheControllerSocket.hpp b/src/tools/rbd_cache/CacheControllerSocket.hpp
+index 967af1d..d178b58 100644
+--- a/src/tools/rbd_cache/CacheControllerSocket.hpp
++++ b/src/tools/rbd_cache/CacheControllerSocket.hpp
+@@ -43,7 +43,9 @@ public:
+ void handle_read(const boost::system::error_code& error, size_t bytes_transferred) {
+
+ if (!error) {
+-
++ if(bytes_transferred != 544){
++ assert(0);
++ }
+ process_msg(session_id, std::string(data_, bytes_transferred));
+
+ }
+@@ -51,7 +53,8 @@ public:
+
+ void handle_write(const boost::system::error_code& error) {
+ if (!error) {
+- socket_.async_read_some(boost::asio::buffer(data_),
++ boost::asio::async_read(socket_, boost::asio::buffer(data_),
++ boost::asio::transfer_exactly(544),
+ boost::bind(&session::handle_read,
+ shared_from_this(),
+ boost::asio::placeholders::error,
+@@ -63,6 +66,7 @@ public:
+
+ boost::asio::async_write(socket_,
+ boost::asio::buffer(msg.c_str(), msg.size()),
++ boost::asio::transfer_exactly(544),
+ boost::bind(&session::handle_write,
+ shared_from_this(),
+ boost::asio::placeholders::error));
+diff --git a/src/tools/rbd_cache/CacheControllerSocketClient.hpp b/src/tools/rbd_cache/CacheControllerSocketClient.hpp
+index 56b79ce..3b0ca00 100644
+--- a/src/tools/rbd_cache/CacheControllerSocketClient.hpp
++++ b/src/tools/rbd_cache/CacheControllerSocketClient.hpp
+@@ -8,6 +8,7 @@
+ #include <boost/bind.hpp>
+ #include <boost/algorithm/string.hpp>
+ #include "include/assert.h"
++#include "include/Context.h"
+ #include "CacheControllerSocketCommon.h"
+
+
+@@ -26,8 +27,12 @@ public:
+ m_client_process_msg(processmsg),
+ ep_(stream_protocol::endpoint(file))
+ {
+- std::thread thd([this](){io_service_.run(); });
+- thd.detach();
++ io_thread.reset(new std::thread([this](){io_service_.run(); }));
++ }
++
++ ~CacheClient() {
++ io_service_.stop();
++ io_thread->join();
+ }
+
+ void run(){
+@@ -53,7 +58,8 @@ public:
+ message->offset = 0;
+ message->length = 0;
+ boost::asio::async_write(socket_, boost::asio::buffer((char*)message, message->size()),
+- [this](const boost::system::error_code& err, size_t cb) {
++ [this, message](const boost::system::error_code& err, size_t cb) {
++ delete message;
+ if (!err) {
+ boost::asio::async_read(socket_, boost::asio::buffer(buffer_),
+ boost::asio::transfer_exactly(544),
+@@ -72,7 +78,7 @@ public:
+ return 0;
+ }
+
+- int lookup_object(std::string pool_name, std::string vol_name, std::string object_id, bool* result) {
++ int lookup_object(std::string pool_name, std::string vol_name, std::string object_id, Context* on_finish) {
+ rbdsc_req_type_t *message = new rbdsc_req_type_t();
+ message->type = RBDSC_READ;
+ memcpy(message->pool_name, pool_name.c_str(), pool_name.size());
+@@ -82,49 +88,48 @@ public:
+ message->length = 0;
+
+ boost::asio::async_write(socket_, boost::asio::buffer((char*)message, message->size()),
+- [this, result](const boost::system::error_code& err, size_t cb) {
++ [this, on_finish, message](const boost::system::error_code& err, size_t cb) {
++ delete message;
+ if (!err) {
+- get_result(result);
++ get_result(on_finish);
+ } else {
+ return -1;
+ }
+ });
+- std::unique_lock<std::mutex> lk(m);
+- //cv.wait(lk);
+- cv.wait_for(lk, std::chrono::milliseconds(100));
++
+ return 0;
+ }
+
+- void get_result(bool* result) {
++ void get_result(Context* on_finish) {
+ boost::asio::async_read(socket_, boost::asio::buffer(buffer_),
+ boost::asio::transfer_exactly(544),
+- [this, result](const boost::system::error_code& err, size_t cb) {
++ [this, on_finish](const boost::system::error_code& err, size_t cb) {
++ if (cb != 544) {
++ assert(0);
++ }
+ if (!err) {
+ rbdsc_req_type_t *io_ctx = (rbdsc_req_type_t*)(buffer_);
+ if (io_ctx->type == RBDSC_READ_REPLY) {
+- *result = true;
++ on_finish->complete(true);
++ return;
+ } else {
+- *result = false;
++ on_finish->complete(false);
++ return;
+ }
+- cv.notify_one();
+- m_client_process_msg(std::string(buffer_, cb));
+ } else {
+- return -1;
++ assert(0);
++ return on_finish->complete(false);
+ }
+ });
+ }
+
+- void handle_connect(const boost::system::error_code& error) {
+- //TODO(): open librbd snap
+- }
+-
+- void handle_write(const boost::system::error_code& error) {
+- }
+
+ private:
+ boost::asio::io_service& io_service_;
+ boost::asio::io_service::work io_service_work_;
+ stream_protocol::socket socket_;
++
++ std::shared_ptr<std::thread> io_thread;
+ ClientProcessMsg m_client_process_msg;
+ stream_protocol::endpoint ep_;
+ char buffer_[1024];
+diff --git a/src/tools/rbd_cache/CacheControllerSocketCommon.h b/src/tools/rbd_cache/CacheControllerSocketCommon.h
+index a9d73a8..e026ec8 100644
+--- a/src/tools/rbd_cache/CacheControllerSocketCommon.h
++++ b/src/tools/rbd_cache/CacheControllerSocketCommon.h
+@@ -4,6 +4,7 @@
+ #ifndef CACHE_CONTROLLER_SOCKET_COMMON_H
+ #define CACHE_CONTROLLER_SOCKET_COMMON_H
+
++/*
+ #define RBDSC_REGISTER 0X11
+ #define RBDSC_READ 0X12
+ #define RBDSC_LOOKUP 0X13
+@@ -11,10 +12,21 @@
+ #define RBDSC_READ_REPLY 0X15
+ #define RBDSC_LOOKUP_REPLY 0X16
+ #define RBDSC_READ_RADOS 0X17
++*/
+
+ namespace rbd {
+ namespace cache {
+
++static const int RBDSC_REGISTER = 0X11;
++static const int RBDSC_READ = 0X12;
++static const int RBDSC_LOOKUP = 0X13;
++static const int RBDSC_REGISTER_REPLY = 0X14;
++static const int RBDSC_READ_REPLY = 0X15;
++static const int RBDSC_LOOKUP_REPLY = 0X16;
++static const int RBDSC_READ_RADOS = 0X17;
++
++
++
+ typedef std::function<void(uint64_t, std::string)> ProcessMsg;
+ typedef std::function<void(std::string)> ClientProcessMsg;
+ typedef uint8_t rbdsc_req_type;
+--
+2.7.4
+