diff options
-rw-r--r-- | src/ceph/0001-librbd-shared-persistent-read-only-rbd-cache.patch | 1685 | ||||
-rw-r--r-- | src/ceph/0002-librbd-cleanup-rbd-shared-RO-cache.patch | 847 | ||||
-rw-r--r-- | src/ceph/0003-librbd-fix-bufferlist-point.patch | 71 | ||||
-rw-r--r-- | src/ceph/0004-librbd-fix-lookup-object-return.patch | 45 | ||||
-rw-r--r-- | src/ceph/0005-librbd-fix-conf-get_val.patch | 63 | ||||
-rw-r--r-- | src/ceph/0006-librbd-LRU-policy-based-eviction.patch | 403 | ||||
-rw-r--r-- | src/ceph/0007-librbd-cleanup-policy-based-promotion-eviction.patch | 512 | ||||
-rw-r--r-- | src/ceph/0008-librbd-implement-async-cache-lookup-and-read.patch | 366 | ||||
-rw-r--r-- | src/ceph/0009-librbd-clean-up-on-rbd-shared-cache.patch | 767 | ||||
-rw-r--r-- | src/ceph/0010-librbd-new-namespace-ceph-immutable-obj-cache.patch | 3510 | ||||
-rw-r--r-- | src/ceph/ceph.rc | 12 |
11 files changed, 1 insertions, 8280 deletions
diff --git a/src/ceph/0001-librbd-shared-persistent-read-only-rbd-cache.patch b/src/ceph/0001-librbd-shared-persistent-read-only-rbd-cache.patch deleted file mode 100644 index 0476086..0000000 --- a/src/ceph/0001-librbd-shared-persistent-read-only-rbd-cache.patch +++ /dev/null @@ -1,1685 +0,0 @@ -From b7b81562c76011abe05930330915a5ba423964e4 Mon Sep 17 00:00:00 2001 -From: Yuan Zhou <yuan.zhou@intel.com> -Date: Thu, 19 Apr 2018 22:54:36 +0800 -Subject: [PATCH 01/10] librbd: shared persistent read-only rbd cache - -This patch introduces introduces RBD shared persistent RO cache which -can provide client-side sharing cache for rbd clone/snapshot case. - -The key componenets are: - -- RBD cache daemon runs on each compute node to control the shared cache state - -- Read-only blocks from parent image(s) are cached in a shared area on - compute node(s) - -- Object level dispatcher inside librbd that can do RPC with cache daemon to - lookup the cache - -- Reads are served from the shared cache until the first COW request - -- Policy to control promotion/evication of the shared cache - -The general IO flow is: - -0) Parent image would register themselfs when initializing - -1) When read request on cloned image flows to parent image, it will check with - the cache daemon if the rarget object is ready - -2) Cache daemon receives the lookup request: - a) if the target object is promoted, daemon will ack with "read_from_cache" - b) if it is not promoted, daemon will check the policy whether to promote: - - if yes, daemon will do the promiton then ack with "read_from_cache" - - if no, daemon will ack with "read_from_rados" - -3) the read reqeust contines to do read from cache/rados based on the ack - -Signed-off-by: Yuan Zhou <yuan.zhou@intel.com> ---- - src/common/options.cc | 8 ++ - src/librbd/CMakeLists.txt | 4 +- - src/librbd/ImageCtx.cc | 5 +- - src/librbd/ImageCtx.h | 3 + - src/librbd/cache/SharedPersistentObjectCacher.cc | 61 ++++++++ - src/librbd/cache/SharedPersistentObjectCacher.h | 45 ++++++ - .../SharedPersistentObjectCacherObjectDispatch.cc | 154 +++++++++++++++++++++ - .../SharedPersistentObjectCacherObjectDispatch.h | 127 +++++++++++++++++ - src/librbd/image/OpenRequest.cc | 12 +- - src/librbd/io/Types.h | 1 + - src/os/CacheStore/SyncFile.cc | 110 +++++++++++++++ - src/os/CacheStore/SyncFile.h | 74 ++++++++++ - src/test/librbd/test_mirroring.cc | 1 + - src/test/rbd_mirror/test_ImageReplayer.cc | 2 + - src/test/rbd_mirror/test_fixture.cc | 1 + - src/tools/CMakeLists.txt | 1 + - src/tools/rbd_cache/CMakeLists.txt | 9 ++ - src/tools/rbd_cache/CacheController.cc | 105 ++++++++++++++ - src/tools/rbd_cache/CacheController.hpp | 49 +++++++ - src/tools/rbd_cache/CacheControllerSocket.hpp | 125 +++++++++++++++++ - .../rbd_cache/CacheControllerSocketClient.hpp | 131 ++++++++++++++++++ - src/tools/rbd_cache/CacheControllerSocketCommon.h | 43 ++++++ - src/tools/rbd_cache/ObjectCacheStore.cc | 147 ++++++++++++++++++++ - src/tools/rbd_cache/ObjectCacheStore.h | 65 +++++++++ - src/tools/rbd_cache/main.cc | 85 ++++++++++++ - 25 files changed, 1365 insertions(+), 3 deletions(-) - create mode 100644 src/librbd/cache/SharedPersistentObjectCacher.cc - create mode 100644 src/librbd/cache/SharedPersistentObjectCacher.h - create mode 100644 src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc - create mode 100644 src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.h - create mode 100644 src/os/CacheStore/SyncFile.cc - create mode 100644 src/os/CacheStore/SyncFile.h - create mode 100644 src/tools/rbd_cache/CMakeLists.txt - create mode 100644 src/tools/rbd_cache/CacheController.cc - create mode 100644 src/tools/rbd_cache/CacheController.hpp - create mode 100644 src/tools/rbd_cache/CacheControllerSocket.hpp - create mode 100644 src/tools/rbd_cache/CacheControllerSocketClient.hpp - create mode 100644 src/tools/rbd_cache/CacheControllerSocketCommon.h - create mode 100644 src/tools/rbd_cache/ObjectCacheStore.cc - create mode 100644 src/tools/rbd_cache/ObjectCacheStore.h - create mode 100644 src/tools/rbd_cache/main.cc - -diff --git a/src/common/options.cc b/src/common/options.cc -index c5afe4c..7839a31 100644 ---- a/src/common/options.cc -+++ b/src/common/options.cc -@@ -6357,6 +6357,14 @@ static std::vector<Option> get_rbd_options() { - .set_default(60) - .set_description("time in seconds for detecting a hung thread"), - -+ Option("rbd_shared_cache_enabled", Option::TYPE_BOOL, Option::LEVEL_ADVANCED) -+ .set_default(true) -+ .set_description("whether to enable shared ssd caching"), -+ -+ Option("rbd_shared_cache_path", Option::TYPE_STR, Option::LEVEL_ADVANCED) -+ .set_default("/tmp") -+ .set_description("shared ssd caching data dir"), -+ - Option("rbd_non_blocking_aio", Option::TYPE_BOOL, Option::LEVEL_ADVANCED) - .set_default(true) - .set_description("process AIO ops from a dispatch thread to prevent blocking"), -diff --git a/src/librbd/CMakeLists.txt b/src/librbd/CMakeLists.txt -index b9c08d4..92539a8 100644 ---- a/src/librbd/CMakeLists.txt -+++ b/src/librbd/CMakeLists.txt -@@ -32,7 +32,8 @@ set(librbd_internal_srcs - api/Snapshot.cc - cache/ImageWriteback.cc - cache/ObjectCacherObjectDispatch.cc -- cache/PassthroughImageCache.cc -+ cache/SharedPersistentObjectCacherObjectDispatch.cc -+ cache/SharedPersistentObjectCacher.cc - deep_copy/ImageCopyRequest.cc - deep_copy/MetadataCopyRequest.cc - deep_copy/ObjectCopyRequest.cc -@@ -123,6 +124,7 @@ set(librbd_internal_srcs - trash/MoveRequest.cc - watcher/Notifier.cc - watcher/RewatchRequest.cc -+ ${CMAKE_SOURCE_DIR}/src/os/CacheStore/SyncFile.cc - ${CMAKE_SOURCE_DIR}/src/common/ContextCompletion.cc) - - add_library(rbd_api STATIC librbd.cc) -diff --git a/src/librbd/ImageCtx.cc b/src/librbd/ImageCtx.cc -index 48f98b1..349156b 100644 ---- a/src/librbd/ImageCtx.cc -+++ b/src/librbd/ImageCtx.cc -@@ -776,7 +776,8 @@ public: - "rbd_qos_read_iops_limit", false)( - "rbd_qos_write_iops_limit", false)( - "rbd_qos_read_bps_limit", false)( -- "rbd_qos_write_bps_limit", false); -+ "rbd_qos_write_bps_limit", false)( -+ "rbd_shared_cache_enabled", false); - - ConfigProxy local_config_t{false}; - std::map<std::string, bufferlist> res; -@@ -844,6 +845,8 @@ public: - ASSIGN_OPTION(qos_write_iops_limit, uint64_t); - ASSIGN_OPTION(qos_read_bps_limit, uint64_t); - ASSIGN_OPTION(qos_write_bps_limit, uint64_t); -+ ASSIGN_OPTION(shared_cache_enabled, bool); -+ ASSIGN_OPTION(shared_cache_path, std::string); - - if (thread_safe) { - ASSIGN_OPTION(journal_pool, std::string); -diff --git a/src/librbd/ImageCtx.h b/src/librbd/ImageCtx.h -index d197c24..f661c09 100644 ---- a/src/librbd/ImageCtx.h -+++ b/src/librbd/ImageCtx.h -@@ -204,6 +204,9 @@ namespace librbd { - uint64_t qos_read_bps_limit; - uint64_t qos_write_bps_limit; - -+ bool shared_cache_enabled; -+ std::string shared_cache_path; -+ - LibrbdAdminSocketHook *asok_hook; - - exclusive_lock::Policy *exclusive_lock_policy = nullptr; -diff --git a/src/librbd/cache/SharedPersistentObjectCacher.cc b/src/librbd/cache/SharedPersistentObjectCacher.cc -new file mode 100644 -index 0000000..a849260 ---- /dev/null -+++ b/src/librbd/cache/SharedPersistentObjectCacher.cc -@@ -0,0 +1,61 @@ -+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -+// vim: ts=8 sw=2 smarttab -+ -+#include "librbd/cache/SharedPersistentObjectCacher.h" -+#include "include/buffer.h" -+#include "common/dout.h" -+#include "librbd/ImageCtx.h" -+ -+#define dout_subsys ceph_subsys_rbd -+#undef dout_prefix -+#define dout_prefix *_dout << "librbd::cache::SharedPersistentObjectCacher: " << this \ -+ << " " << __func__ << ": " -+ -+namespace librbd { -+namespace cache { -+ -+template <typename I> -+SharedPersistentObjectCacher<I>::SharedPersistentObjectCacher(I *image_ctx, std::string cache_path) -+ : m_image_ctx(image_ctx), m_cache_path(cache_path), -+ m_file_map_lock("librbd::cache::SharedObjectCacher::filemaplock") { -+ auto *cct = m_image_ctx->cct; -+ -+} -+ -+template <typename I> -+SharedPersistentObjectCacher<I>::~SharedPersistentObjectCacher() { -+ for(auto &it: file_map) { -+ if(it.second) { -+ delete it.second; -+ } -+ } -+} -+ -+template <typename I> -+int SharedPersistentObjectCacher<I>::read_object(std::string oid, ceph::bufferlist* read_data, uint64_t offset, uint64_t length, Context *on_finish) { -+ -+ auto *cct = m_image_ctx->cct; -+ ldout(cct, 20) << "object: " << oid << dendl; -+ -+ std::string cache_file_name = m_image_ctx->data_ctx.get_pool_name() + oid; -+ -+ //TODO(): make a cache for cachefile fd -+ os::CacheStore::SyncFile* target_cache_file = new os::CacheStore::SyncFile(cct, cache_file_name); -+ target_cache_file->open(); -+ -+ int ret = target_cache_file->read_object_from_file(read_data, offset, length); -+ if (ret < 0) { -+ ldout(cct, 5) << "read from file return error: " << ret -+ << "file name= " << cache_file_name -+ << dendl; -+ } -+ -+ delete target_cache_file; -+ return ret; -+} -+ -+ -+} // namespace cache -+} // namespace librbd -+ -+template class librbd::cache::SharedPersistentObjectCacher<librbd::ImageCtx>; -diff --git a/src/librbd/cache/SharedPersistentObjectCacher.h b/src/librbd/cache/SharedPersistentObjectCacher.h -new file mode 100644 -index 0000000..d108a05 ---- /dev/null -+++ b/src/librbd/cache/SharedPersistentObjectCacher.h -@@ -0,0 +1,45 @@ -+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -+// vim: ts=8 sw=2 smarttab -+ -+#ifndef CEPH_LIBRBD_CACHE_SHARED_PERSISTENT_OBJECT_CACHER -+#define CEPH_LIBRBD_CACHE_SHARED_PERSISTENT_OBJECT_CACHER -+ -+#include "include/buffer_fwd.h" -+#include "include/int_types.h" -+#include "os/CacheStore/SyncFile.h" -+#include "common/Mutex.h" -+#include <vector> -+#include <map> -+ -+struct Context; -+ -+namespace librbd { -+ -+struct ImageCtx; -+ -+namespace cache { -+ -+template <typename ImageCtxT> -+class SharedPersistentObjectCacher { -+public: -+ -+ SharedPersistentObjectCacher(ImageCtxT *image_ctx, std::string cache_path); -+ ~SharedPersistentObjectCacher(); -+ -+ int read_object(std::string oid, ceph::bufferlist* read_data, -+ uint64_t offset, uint64_t length, Context *on_finish); -+ -+private: -+ ImageCtxT *m_image_ctx; -+ std::map<std::string, os::CacheStore::SyncFile*> file_map; -+ Mutex m_file_map_lock; -+ std::string m_cache_path; -+ -+}; -+ -+} // namespace cache -+} // namespace librbd -+ -+extern template class librbd::cache::SharedPersistentObjectCacher<librbd::ImageCtx>; -+ -+#endif // CEPH_LIBRBD_CACHE_FILE_IMAGE_STORE -diff --git a/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc b/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc -new file mode 100644 -index 0000000..90d886c ---- /dev/null -+++ b/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc -@@ -0,0 +1,154 @@ -+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -+// vim: ts=8 sw=2 smarttab -+ -+#include "librbd/cache/SharedPersistentObjectCacherObjectDispatch.h" -+#include "common/WorkQueue.h" -+#include "librbd/ImageCtx.h" -+#include "librbd/Journal.h" -+#include "librbd/Utils.h" -+#include "librbd/LibrbdWriteback.h" -+#include "librbd/io/ObjectDispatchSpec.h" -+#include "librbd/io/ObjectDispatcher.h" -+#include "librbd/io/Utils.h" -+#include "osd/osd_types.h" -+#include "osdc/WritebackHandler.h" -+#include <vector> -+ -+#define dout_subsys ceph_subsys_rbd -+#undef dout_prefix -+#define dout_prefix *_dout << "librbd::cache::SharedPersistentObjectCacherObjectDispatch: " \ -+ << this << " " << __func__ << ": " -+ -+namespace librbd { -+namespace cache { -+ -+template <typename I> -+SharedPersistentObjectCacherObjectDispatch<I>::SharedPersistentObjectCacherObjectDispatch( -+ I* image_ctx) : m_image_ctx(image_ctx) { -+} -+ -+template <typename I> -+SharedPersistentObjectCacherObjectDispatch<I>::~SharedPersistentObjectCacherObjectDispatch() { -+ if (m_object_store) { -+ delete m_object_store; -+ } -+ -+ if (m_cache_client) { -+ delete m_cache_client; -+ } -+} -+ -+template <typename I> -+void SharedPersistentObjectCacherObjectDispatch<I>::init() { -+ auto cct = m_image_ctx->cct; -+ ldout(cct, 5) << dendl; -+ -+ if (m_image_ctx->parent != nullptr) { -+ //TODO(): should we cover multi-leveled clone? -+ ldout(cct, 5) << "child image: skipping SRO cache client" << dendl; -+ return; -+ } -+ -+ ldout(cct, 20) << "parent image: setup SRO cache client = " << dendl; -+ -+ std::string controller_path = "/tmp/rbd_shared_readonly_cache_demo"; -+ m_cache_client = new CacheClient(io_service, controller_path.c_str(), -+ ([&](std::string s){client_handle_request(s);})); -+ -+ int ret = m_cache_client->connect(); -+ if (ret < 0) { -+ ldout(cct, 5) << "SRO cache client fail to connect with local controller: " -+ << "please start rbd-cache daemon" -+ << dendl; -+ } else { -+ ldout(cct, 5) << "SRO cache client to register volume on rbd-cache daemon: " -+ << "name = " << m_image_ctx->id -+ << dendl; -+ -+ ret = m_cache_client->register_volume(m_image_ctx->data_ctx.get_pool_name(), -+ m_image_ctx->id, m_image_ctx->size); -+ -+ if (ret >= 0) { -+ // add ourself to the IO object dispatcher chain -+ m_image_ctx->io_object_dispatcher->register_object_dispatch(this); -+ } -+ } -+} -+ -+template <typename I> -+bool SharedPersistentObjectCacherObjectDispatch<I>::read( -+ const std::string &oid, uint64_t object_no, uint64_t object_off, -+ uint64_t object_len, librados::snap_t snap_id, int op_flags, -+ const ZTracer::Trace &parent_trace, ceph::bufferlist* read_data, -+ io::ExtentMap* extent_map, int* object_dispatch_flags, -+ io::DispatchResult* dispatch_result, Context** on_finish, -+ Context* on_dispatched) { -+ // IO chained in reverse order -+ auto cct = m_image_ctx->cct; -+ ldout(cct, 20) << "object_no=" << object_no << " " << object_off << "~" -+ << object_len << dendl; -+ -+ // ensure we aren't holding the cache lock post-read -+ on_dispatched = util::create_async_context_callback(*m_image_ctx, -+ on_dispatched); -+ -+ if (m_cache_client && m_cache_client->connected && m_object_store) { -+ bool exists; -+ m_cache_client->lookup_object(m_image_ctx->data_ctx.get_pool_name(), -+ m_image_ctx->id, oid, &exists); -+ -+ // try to read from parent image -+ ldout(cct, 20) << "SRO cache object exists:" << exists << dendl; -+ if (exists) { -+ int r = m_object_store->read_object(oid, read_data, object_off, object_len, on_dispatched); -+ if (r != 0) { -+ *dispatch_result = io::DISPATCH_RESULT_COMPLETE; -+ on_dispatched->complete(r); -+ return true; -+ } -+ } -+ } -+ -+ ldout(cct, 20) << "Continue read from RADOS" << dendl; -+ *dispatch_result = io::DISPATCH_RESULT_CONTINUE; -+ on_dispatched->complete(0); -+ return true; -+} -+ -+template <typename I> -+void SharedPersistentObjectCacherObjectDispatch<I>::client_handle_request(std::string msg) { -+ auto cct = m_image_ctx->cct; -+ ldout(cct, 20) << dendl; -+ -+ rbdsc_req_type_t *io_ctx = (rbdsc_req_type_t*)(msg.c_str()); -+ -+ switch (io_ctx->type) { -+ case RBDSC_REGISTER_REPLY: { -+ // open cache handler for volume -+ ldout(cct, 20) << "SRO cache client open cache handler" << dendl; -+ m_object_store = new SharedPersistentObjectCacher<I>(m_image_ctx, m_image_ctx->shared_cache_path); -+ -+ break; -+ } -+ case RBDSC_READ_REPLY: { -+ ldout(cct, 20) << "SRO cache client start to read cache" << dendl; -+ //TODO(): should call read here -+ -+ break; -+ } -+ case RBDSC_READ_RADOS: { -+ ldout(cct, 20) << "SRO cache client start to read rados" << dendl; -+ //TODO(): should call read here -+ -+ break; -+ } -+ default: ldout(cct, 20) << "nothing" << dendl; -+ break; -+ -+ } -+} -+ -+} // namespace cache -+} // namespace librbd -+ -+template class librbd::cache::SharedPersistentObjectCacherObjectDispatch<librbd::ImageCtx>; -diff --git a/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.h b/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.h -new file mode 100644 -index 0000000..1ede804 ---- /dev/null -+++ b/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.h -@@ -0,0 +1,127 @@ -+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -+// vim: ts=8 sw=2 smarttab -+ -+#ifndef CEPH_LIBRBD_CACHE_SHARED_PERSISTENT_OBJECT_CACHER_OBJECT_DISPATCH_H -+#define CEPH_LIBRBD_CACHE_SHARED_PERSISTENT_OBJECT_CACHER_OBJECT_DISPATCH_H -+ -+#include "librbd/io/ObjectDispatchInterface.h" -+#include "common/Mutex.h" -+#include "osdc/ObjectCacher.h" -+#include "tools/rbd_cache/CacheControllerSocketClient.hpp" -+#include "SharedPersistentObjectCacher.h" -+ -+struct WritebackHandler; -+ -+namespace librbd { -+ -+class ImageCtx; -+ -+namespace cache { -+ -+/** -+ * Facade around the OSDC object cacher to make it align with -+ * the object dispatcher interface -+ */ -+template <typename ImageCtxT = ImageCtx> -+class SharedPersistentObjectCacherObjectDispatch : public io::ObjectDispatchInterface { -+public: -+ static SharedPersistentObjectCacherObjectDispatch* create(ImageCtxT* image_ctx) { -+ return new SharedPersistentObjectCacherObjectDispatch(image_ctx); -+ } -+ -+ SharedPersistentObjectCacherObjectDispatch(ImageCtxT* image_ctx); -+ ~SharedPersistentObjectCacherObjectDispatch() override; -+ -+ io::ObjectDispatchLayer get_object_dispatch_layer() const override { -+ return io::OBJECT_DISPATCH_LAYER_SHARED_PERSISTENT_CACHE; -+ } -+ -+ void init(); -+ void shut_down(Context* on_finish) { -+ m_image_ctx->op_work_queue->queue(on_finish, 0); -+ } -+ -+ bool read( -+ const std::string &oid, uint64_t object_no, uint64_t object_off, -+ uint64_t object_len, librados::snap_t snap_id, int op_flags, -+ const ZTracer::Trace &parent_trace, ceph::bufferlist* read_data, -+ io::ExtentMap* extent_map, int* object_dispatch_flags, -+ io::DispatchResult* dispatch_result, Context** on_finish, -+ Context* on_dispatched) override; -+ -+ bool discard( -+ const std::string &oid, uint64_t object_no, uint64_t object_off, -+ uint64_t object_len, const ::SnapContext &snapc, int discard_flags, -+ const ZTracer::Trace &parent_trace, int* object_dispatch_flags, -+ uint64_t* journal_tid, io::DispatchResult* dispatch_result, -+ Context** on_finish, Context* on_dispatched) { -+ return false; -+ } -+ -+ bool write( -+ const std::string &oid, uint64_t object_no, uint64_t object_off, -+ ceph::bufferlist&& data, const ::SnapContext &snapc, int op_flags, -+ const ZTracer::Trace &parent_trace, int* object_dispatch_flags, -+ uint64_t* journal_tid, io::DispatchResult* dispatch_result, -+ Context** on_finish, Context* on_dispatched) { -+ return false; -+ } -+ -+ bool write_same( -+ const std::string &oid, uint64_t object_no, uint64_t object_off, -+ uint64_t object_len, io::Extents&& buffer_extents, -+ ceph::bufferlist&& data, const ::SnapContext &snapc, int op_flags, -+ const ZTracer::Trace &parent_trace, int* object_dispatch_flags, -+ uint64_t* journal_tid, io::DispatchResult* dispatch_result, -+ Context** on_finish, Context* on_dispatched) { -+ return false; -+ } -+ -+ bool compare_and_write( -+ const std::string &oid, uint64_t object_no, uint64_t object_off, -+ ceph::bufferlist&& cmp_data, ceph::bufferlist&& write_data, -+ const ::SnapContext &snapc, int op_flags, -+ const ZTracer::Trace &parent_trace, uint64_t* mismatch_offset, -+ int* object_dispatch_flags, uint64_t* journal_tid, -+ io::DispatchResult* dispatch_result, Context** on_finish, -+ Context* on_dispatched) { -+ return false; -+ } -+ -+ bool flush( -+ io::FlushSource flush_source, const ZTracer::Trace &parent_trace, -+ io::DispatchResult* dispatch_result, Context** on_finish, -+ Context* on_dispatched) { -+ return false; -+ } -+ -+ bool invalidate_cache(Context* on_finish) { -+ return false; -+ } -+ -+ bool reset_existence_cache(Context* on_finish) { -+ return false; -+ } -+ -+ void extent_overwritten( -+ uint64_t object_no, uint64_t object_off, uint64_t object_len, -+ uint64_t journal_tid, uint64_t new_journal_tid) { -+ } -+ -+ SharedPersistentObjectCacher<ImageCtxT> *m_object_store = nullptr; -+ -+private: -+ -+ ImageCtxT* m_image_ctx; -+ -+ void client_handle_request(std::string msg); -+ CacheClient *m_cache_client = nullptr; -+ boost::asio::io_service io_service; -+}; -+ -+} // namespace cache -+} // namespace librbd -+ -+extern template class librbd::cache::SharedPersistentObjectCacherObjectDispatch<librbd::ImageCtx>; -+ -+#endif // CEPH_LIBRBD_CACHE_OBJECT_CACHER_OBJECT_DISPATCH_H -diff --git a/src/librbd/image/OpenRequest.cc b/src/librbd/image/OpenRequest.cc -index ae18739..30a7b66 100644 ---- a/src/librbd/image/OpenRequest.cc -+++ b/src/librbd/image/OpenRequest.cc -@@ -8,6 +8,7 @@ - #include "librbd/ImageCtx.h" - #include "librbd/Utils.h" - #include "librbd/cache/ObjectCacherObjectDispatch.h" -+#include "librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc" - #include "librbd/image/CloseRequest.h" - #include "librbd/image/RefreshRequest.h" - #include "librbd/image/SetSnapRequest.h" -@@ -448,12 +449,21 @@ Context *OpenRequest<I>::handle_refresh(int *result) { - - template <typename I> - Context *OpenRequest<I>::send_init_cache(int *result) { -+ -+ CephContext *cct = m_image_ctx->cct; - // cache is disabled or parent image context - if (!m_image_ctx->cache || m_image_ctx->child != nullptr) { -+ -+ // enable Shared Read-only cache for parent image -+ if (m_image_ctx->child != nullptr && m_image_ctx->shared_cache_enabled ) { -+ ldout(cct, 10) << this << " " << "setting up parent cache"<< dendl; -+ auto sro_cache = cache::SharedPersistentObjectCacherObjectDispatch<I>::create(m_image_ctx); -+ sro_cache->init(); -+ } -+ - return send_register_watch(result); - } - -- CephContext *cct = m_image_ctx->cct; - ldout(cct, 10) << this << " " << __func__ << dendl; - - auto cache = cache::ObjectCacherObjectDispatch<I>::create(m_image_ctx); -diff --git a/src/librbd/io/Types.h b/src/librbd/io/Types.h -index 7e09c90..ef3049f 100644 ---- a/src/librbd/io/Types.h -+++ b/src/librbd/io/Types.h -@@ -59,6 +59,7 @@ enum DispatchResult { - enum ObjectDispatchLayer { - OBJECT_DISPATCH_LAYER_NONE = 0, - OBJECT_DISPATCH_LAYER_CACHE, -+ OBJECT_DISPATCH_LAYER_SHARED_PERSISTENT_CACHE, - OBJECT_DISPATCH_LAYER_JOURNAL, - OBJECT_DISPATCH_LAYER_CORE, - OBJECT_DISPATCH_LAYER_LAST -diff --git a/src/os/CacheStore/SyncFile.cc b/src/os/CacheStore/SyncFile.cc -new file mode 100644 -index 0000000..5352bde ---- /dev/null -+++ b/src/os/CacheStore/SyncFile.cc -@@ -0,0 +1,110 @@ -+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -+// vim: ts=8 sw=2 smarttab -+ -+#include "os/CacheStore/SyncFile.h" -+#include "include/Context.h" -+#include "common/dout.h" -+#include "common/WorkQueue.h" -+#include "librbd/ImageCtx.h" -+#include <sys/types.h> -+#include <sys/stat.h> -+#include <aio.h> -+#include <errno.h> -+#include <fcntl.h> -+#include <utility> -+ -+#define dout_subsys ceph_subsys_rbd -+#undef dout_prefix -+#define dout_prefix *_dout << "librbd::file::SyncFile: " << this << " " \ -+ << __func__ << ": " -+ -+namespace os { -+namespace CacheStore { -+ -+SyncFile::SyncFile(CephContext *cct, const std::string &name) -+ : cct(cct) -+{ -+ m_name = cct->_conf->get_val<std::string>("rbd_shared_cache_path") + "/rbd_cache." + name; -+ ldout(cct, 20) << "file path=" << m_name << dendl; -+} -+ -+SyncFile::~SyncFile() -+{ -+ // TODO force proper cleanup -+ if (m_fd != -1) { -+ ::close(m_fd); -+ } -+} -+ -+void SyncFile::open(Context *on_finish) -+{ -+ while (true) { -+ m_fd = ::open(m_name.c_str(), O_CREAT | O_DIRECT | O_NOATIME | O_RDWR | O_SYNC, -+ S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP); -+ if (m_fd == -1) { -+ int r = -errno; -+ if (r == -EINTR) { -+ continue; -+ } -+ on_finish->complete(r); -+ return; -+ } -+ break; -+ } -+ -+ on_finish->complete(0); -+} -+ -+void SyncFile::open() -+{ -+ while (true) -+ { -+ m_fd = ::open(m_name.c_str(), O_CREAT | O_NOATIME | O_RDWR | O_SYNC, -+ S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP); -+ if (m_fd == -1) -+ { -+ int r = -errno; -+ if (r == -EINTR) { -+ continue; -+ } -+ return; -+ } -+ break; -+ } -+} -+ -+int SyncFile::write_object_to_file(ceph::bufferlist read_buf, uint64_t object_len) { -+ -+ ldout(cct, 20) << "cache file name:" << m_name -+ << ", length:" << object_len << dendl; -+ -+ // TODO(): aio -+ int ret = pwrite(m_fd, read_buf.c_str(), object_len, 0); -+ if(ret < 0) { -+ lderr(cct)<<"write file fail:" << std::strerror(errno) << dendl; -+ return ret; -+ } -+ -+ return ret; -+} -+ -+int SyncFile::read_object_from_file(ceph::bufferlist* read_buf, uint64_t object_off, uint64_t object_len) { -+ -+ ldout(cct, 20) << "offset:" << object_off -+ << ", length:" << object_len << dendl; -+ -+ bufferptr buf(object_len); -+ -+ // TODO(): aio -+ int ret = pread(m_fd, buf.c_str(), object_len, object_off); -+ if(ret < 0) { -+ lderr(cct)<<"read file fail:" << std::strerror(errno) << dendl; -+ return ret; -+ } -+ read_buf->append(std::move(buf)); -+ -+ return ret; -+} -+ -+} // namespace CacheStore -+} // namespace os -diff --git a/src/os/CacheStore/SyncFile.h b/src/os/CacheStore/SyncFile.h -new file mode 100644 -index 0000000..81602ce ---- /dev/null -+++ b/src/os/CacheStore/SyncFile.h -@@ -0,0 +1,74 @@ -+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -+// vim: ts=8 sw=2 smarttab -+ -+#ifndef CEPH_LIBOS_CACHE_STORE_SYNC_FILE -+#define CEPH_LIBOS_CACHE_STORE_SYNC_FILE -+ -+#include "include/buffer_fwd.h" -+#include <sys/mman.h> -+#include <string> -+ -+struct Context; -+struct ContextWQ; -+class CephContext; -+ -+namespace os { -+ -+namespace CacheStore { -+ -+class SyncFile { -+public: -+ SyncFile(CephContext *cct, const std::string &name); -+ ~SyncFile(); -+ -+ // TODO use IO queue instead of individual commands so operations can be -+ // submitted in batch -+ -+ // TODO use scatter/gather API -+ -+ void open(Context *on_finish); -+ -+ // ## -+ void open(); -+ bool try_open(); -+ void close(Context *on_finish); -+ void remove(Context *on_finish); -+ -+ void read(uint64_t offset, uint64_t length, ceph::bufferlist *bl, Context *on_finish); -+ -+ void write(uint64_t offset, ceph::bufferlist &&bl, bool fdatasync, Context *on_finish); -+ -+ void discard(uint64_t offset, uint64_t length, bool fdatasync, Context *on_finish); -+ -+ void truncate(uint64_t length, bool fdatasync, Context *on_finish); -+ -+ void fsync(Context *on_finish); -+ -+ void fdatasync(Context *on_finish); -+ -+ uint64_t filesize(); -+ -+ int load(void** dest, uint64_t filesize); -+ -+ int remove(); -+ -+ // ## -+ int write_object_to_file(ceph::bufferlist read_buf, uint64_t object_len); -+ int read_object_from_file(ceph::bufferlist* read_buf, uint64_t object_off, uint64_t object_len); -+ -+private: -+ CephContext *cct; -+ std::string m_name; -+ int m_fd = -1; -+ -+ int write(uint64_t offset, const ceph::bufferlist &bl, bool fdatasync); -+ int read(uint64_t offset, uint64_t length, ceph::bufferlist *bl); -+ int discard(uint64_t offset, uint64_t length, bool fdatasync); -+ int truncate(uint64_t length, bool fdatasync); -+ int fdatasync(); -+}; -+ -+} // namespace CacheStore -+} // namespace os -+ -+#endif // CEPH_LIBOS_CACHE_STORE_SYNC_FILE -diff --git a/src/test/librbd/test_mirroring.cc b/src/test/librbd/test_mirroring.cc -index b4fdeae..d7d1aa6 100644 ---- a/src/test/librbd/test_mirroring.cc -+++ b/src/test/librbd/test_mirroring.cc -@@ -47,6 +47,7 @@ public: - - void SetUp() override { - ASSERT_EQ(0, _rados.ioctx_create(_pool_name.c_str(), m_ioctx)); -+ ASSERT_EQ(0, _rados.conf_set("rbd_shared_cache_enabled", "false")); - } - - std::string image_name = "mirrorimg1"; -diff --git a/src/test/rbd_mirror/test_ImageReplayer.cc b/src/test/rbd_mirror/test_ImageReplayer.cc -index 8a95a65..b5598bd 100644 ---- a/src/test/rbd_mirror/test_ImageReplayer.cc -+++ b/src/test/rbd_mirror/test_ImageReplayer.cc -@@ -90,6 +90,7 @@ public: - EXPECT_EQ("", connect_cluster_pp(*m_local_cluster.get())); - EXPECT_EQ(0, m_local_cluster->conf_set("rbd_cache", "false")); - EXPECT_EQ(0, m_local_cluster->conf_set("rbd_mirror_journal_poll_age", "1")); -+ EXPECT_EQ(0, m_local_cluster->conf_set("rbd_shared_cache_enabled", "false")); - - m_local_pool_name = get_temp_pool_name(); - EXPECT_EQ(0, m_local_cluster->pool_create(m_local_pool_name.c_str())); -@@ -99,6 +100,7 @@ public: - - EXPECT_EQ("", connect_cluster_pp(m_remote_cluster)); - EXPECT_EQ(0, m_remote_cluster.conf_set("rbd_cache", "false")); -+ EXPECT_EQ(0, m_remote_cluster.conf_set("rbd_shared_cache_enabled", "false")); - - m_remote_pool_name = get_temp_pool_name(); - EXPECT_EQ(0, m_remote_cluster.pool_create(m_remote_pool_name.c_str())); -diff --git a/src/test/rbd_mirror/test_fixture.cc b/src/test/rbd_mirror/test_fixture.cc -index b2a51ca..9e77098 100644 ---- a/src/test/rbd_mirror/test_fixture.cc -+++ b/src/test/rbd_mirror/test_fixture.cc -@@ -27,6 +27,7 @@ void TestFixture::SetUpTestCase() { - _rados = std::shared_ptr<librados::Rados>(new librados::Rados()); - ASSERT_EQ("", connect_cluster_pp(*_rados.get())); - ASSERT_EQ(0, _rados->conf_set("rbd_cache", "false")); -+ ASSERT_EQ(0, _rados->conf_set("rbd_shared_cache_enabled", "false")); - - _local_pool_name = get_temp_pool_name("test-rbd-mirror-"); - ASSERT_EQ(0, _rados->pool_create(_local_pool_name.c_str())); -diff --git a/src/tools/CMakeLists.txt b/src/tools/CMakeLists.txt -index 3789e3c..72ab342 100644 ---- a/src/tools/CMakeLists.txt -+++ b/src/tools/CMakeLists.txt -@@ -99,6 +99,7 @@ endif(WITH_CEPHFS) - if(WITH_RBD) - add_subdirectory(rbd) - add_subdirectory(rbd_mirror) -+ add_subdirectory(rbd_cache) - if(LINUX) - add_subdirectory(rbd_nbd) - endif() -diff --git a/src/tools/rbd_cache/CMakeLists.txt b/src/tools/rbd_cache/CMakeLists.txt -new file mode 100644 -index 0000000..08eae60 ---- /dev/null -+++ b/src/tools/rbd_cache/CMakeLists.txt -@@ -0,0 +1,9 @@ -+add_executable(rbd-cache -+ ${CMAKE_SOURCE_DIR}/src/os/CacheStore/SyncFile.cc -+ ObjectCacheStore.cc -+ CacheController.cc -+ main.cc) -+target_link_libraries(rbd-cache -+ librados -+ global) -+install(TARGETS rbd-cache DESTINATION bin) -diff --git a/src/tools/rbd_cache/CacheController.cc b/src/tools/rbd_cache/CacheController.cc -new file mode 100644 -index 0000000..c914358 ---- /dev/null -+++ b/src/tools/rbd_cache/CacheController.cc -@@ -0,0 +1,105 @@ -+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -+// vim: ts=8 sw=2 smarttab -+ -+#include "CacheController.hpp" -+ -+#define dout_context g_ceph_context -+#define dout_subsys ceph_subsys_rbd_cache -+#undef dout_prefix -+#define dout_prefix *_dout << "rbd::cache::CacheController: " << this << " " \ -+ << __func__ << ": " -+ -+ -+class ThreadPoolSingleton : public ThreadPool { -+public: -+ ContextWQ *op_work_queue; -+ -+ explicit ThreadPoolSingleton(CephContext *cct) -+ : ThreadPool(cct, "librbd::cache::thread_pool", "tp_librbd_cache", 32, -+ "pcache_threads"), -+ op_work_queue(new ContextWQ("librbd::pcache_op_work_queue", -+ cct->_conf->get_val<int64_t>("rbd_op_thread_timeout"), -+ this)) { -+ start(); -+ } -+ ~ThreadPoolSingleton() override { -+ op_work_queue->drain(); -+ delete op_work_queue; -+ -+ stop(); -+ } -+}; -+ -+ -+CacheController::CacheController(CephContext *cct, const std::vector<const char*> &args): -+ m_args(args), m_cct(cct) { -+ -+} -+ -+CacheController::~CacheController() { -+ -+} -+ -+int CacheController::init() { -+ ThreadPoolSingleton* thread_pool_singleton = &m_cct->lookup_or_create_singleton_object<ThreadPoolSingleton>( -+ "rbd::cache::thread_pool", false, m_cct); -+ pcache_op_work_queue = thread_pool_singleton->op_work_queue; -+ -+ m_object_cache_store = new ObjectCacheStore(m_cct, pcache_op_work_queue); -+ int r = m_object_cache_store->init(false); -+ if (r < 0) { -+ //derr << "init error\n" << dendl; -+ } -+ return r; -+} -+ -+int CacheController::shutdown() { -+ int r = m_object_cache_store->shutdown(); -+ return r; -+} -+ -+void CacheController::handle_signal(int signum){} -+ -+void CacheController::run() { -+ try { -+ //TODO(): use new socket path -+ std::string controller_path = "/tmp/rbd_shared_readonly_cache_demo"; -+ std::remove(controller_path.c_str()); -+ -+ m_cache_server = new CacheServer(io_service, controller_path, -+ ([&](uint64_t p, std::string s){handle_request(p, s);})); -+ io_service.run(); -+ } catch (std::exception& e) { -+ std::cerr << "Exception: " << e.what() << "\n"; -+ } -+} -+ -+void CacheController::handle_request(uint64_t sesstion_id, std::string msg){ -+ rbdsc_req_type_t *io_ctx = (rbdsc_req_type_t*)(msg.c_str()); -+ -+ int ret = 0; -+ -+ switch (io_ctx->type) { -+ case RBDSC_REGISTER: { -+ // init cache layout for volume -+ m_object_cache_store->init_cache(io_ctx->vol_name, io_ctx->vol_size); -+ io_ctx->type = RBDSC_REGISTER_REPLY; -+ m_cache_server->send(sesstion_id, std::string((char*)io_ctx, msg.size())); -+ -+ break; -+ } -+ case RBDSC_READ: { -+ // lookup object in local cache store -+ ret = m_object_cache_store->lookup_object(io_ctx->pool_name, io_ctx->vol_name); -+ if (ret < 0) { -+ io_ctx->type = RBDSC_READ_RADOS; -+ } else { -+ io_ctx->type = RBDSC_READ_REPLY; -+ } -+ m_cache_server->send(sesstion_id, std::string((char*)io_ctx, msg.size())); -+ -+ break; -+ } -+ -+ } -+} -diff --git a/src/tools/rbd_cache/CacheController.hpp b/src/tools/rbd_cache/CacheController.hpp -new file mode 100644 -index 0000000..97113e4 ---- /dev/null -+++ b/src/tools/rbd_cache/CacheController.hpp -@@ -0,0 +1,49 @@ -+#ifndef CACHE_CONTROLLER_H -+#define CACHE_CONTROLLER_H -+ -+#include <thread> -+ -+#include "common/Formatter.h" -+#include "common/admin_socket.h" -+#include "common/debug.h" -+#include "common/errno.h" -+#include "common/ceph_context.h" -+#include "common/Mutex.h" -+#include "common/WorkQueue.h" -+#include "include/rados/librados.hpp" -+#include "include/rbd/librbd.h" -+#include "include/assert.h" -+#include "librbd/ImageCtx.h" -+#include "librbd/ImageState.h" -+ -+#include "CacheControllerSocket.hpp" -+#include "ObjectCacheStore.h" -+ -+ -+using boost::asio::local::stream_protocol; -+ -+class CacheController { -+ public: -+ CacheController(CephContext *cct, const std::vector<const char*> &args); -+ ~CacheController(); -+ -+ int init(); -+ -+ int shutdown(); -+ -+ void handle_signal(int sinnum); -+ -+ void run(); -+ -+ void handle_request(uint64_t sesstion_id, std::string msg); -+ -+ private: -+ boost::asio::io_service io_service; -+ CacheServer *m_cache_server; -+ std::vector<const char*> m_args; -+ CephContext *m_cct; -+ ObjectCacheStore *m_object_cache_store; -+ ContextWQ* pcache_op_work_queue; -+}; -+ -+#endif -diff --git a/src/tools/rbd_cache/CacheControllerSocket.hpp b/src/tools/rbd_cache/CacheControllerSocket.hpp -new file mode 100644 -index 0000000..6e1a743 ---- /dev/null -+++ b/src/tools/rbd_cache/CacheControllerSocket.hpp -@@ -0,0 +1,125 @@ -+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -+// vim: ts=8 sw=2 smarttab -+ -+#ifndef CACHE_CONTROLLER_SOCKET_H -+#define CACHE_CONTROLLER_SOCKET_H -+ -+#include <cstdio> -+#include <iostream> -+#include <array> -+#include <memory> -+#include <string> -+#include <boost/bind.hpp> -+#include <boost/asio.hpp> -+#include <boost/algorithm/string.hpp> -+#include "CacheControllerSocketCommon.h" -+ -+ -+using boost::asio::local::stream_protocol; -+ -+class session : public std::enable_shared_from_this<session> { -+public: -+ session(uint64_t session_id, boost::asio::io_service& io_service, ProcessMsg processmsg) -+ : session_id(session_id), socket_(io_service), process_msg(processmsg) {} -+ -+ stream_protocol::socket& socket() { -+ return socket_; -+ } -+ -+ void start() { -+ -+ boost::asio::async_read(socket_, boost::asio::buffer(data_), -+ boost::asio::transfer_exactly(544), -+ boost::bind(&session::handle_read, -+ shared_from_this(), -+ boost::asio::placeholders::error, -+ boost::asio::placeholders::bytes_transferred)); -+ -+ } -+ -+ void handle_read(const boost::system::error_code& error, size_t bytes_transferred) { -+ -+ if (!error) { -+ -+ process_msg(session_id, std::string(data_, bytes_transferred)); -+ -+ } -+ } -+ -+ void handle_write(const boost::system::error_code& error) { -+ if (!error) { -+ socket_.async_read_some(boost::asio::buffer(data_), -+ boost::bind(&session::handle_read, -+ shared_from_this(), -+ boost::asio::placeholders::error, -+ boost::asio::placeholders::bytes_transferred)); -+ } -+ } -+ -+ void send(std::string msg) { -+ -+ boost::asio::async_write(socket_, -+ boost::asio::buffer(msg.c_str(), msg.size()), -+ boost::bind(&session::handle_write, -+ shared_from_this(), -+ boost::asio::placeholders::error)); -+ -+ } -+ -+private: -+ uint64_t session_id; -+ stream_protocol::socket socket_; -+ ProcessMsg process_msg; -+ -+ // Buffer used to store data received from the client. -+ //std::array<char, 1024> data_; -+ char data_[1024]; -+}; -+ -+typedef std::shared_ptr<session> session_ptr; -+ -+class CacheServer { -+public: -+ CacheServer(boost::asio::io_service& io_service, -+ const std::string& file, ProcessMsg processmsg) -+ : io_service_(io_service), -+ server_process_msg(processmsg), -+ acceptor_(io_service, stream_protocol::endpoint(file)) -+ { -+ session_ptr new_session(new session(session_id, io_service_, server_process_msg)); -+ acceptor_.async_accept(new_session->socket(), -+ boost::bind(&CacheServer::handle_accept, this, new_session, -+ boost::asio::placeholders::error)); -+ } -+ -+ void handle_accept(session_ptr new_session, -+ const boost::system::error_code& error) -+ { -+ //TODO(): open librbd snap -+ if (!error) { -+ new_session->start(); -+ session_map.emplace(session_id, new_session); -+ session_id++; -+ new_session.reset(new session(session_id, io_service_, server_process_msg)); -+ acceptor_.async_accept(new_session->socket(), -+ boost::bind(&CacheServer::handle_accept, this, new_session, -+ boost::asio::placeholders::error)); -+ } -+ } -+ -+ void send(uint64_t session_id, std::string msg) { -+ auto it = session_map.find(session_id); -+ if (it != session_map.end()) { -+ it->second->send(msg); -+ } -+ } -+ -+private: -+ boost::asio::io_service& io_service_; -+ ProcessMsg server_process_msg; -+ stream_protocol::acceptor acceptor_; -+ uint64_t session_id = 1; -+ std::map<uint64_t, session_ptr> session_map; -+}; -+ -+#endif -diff --git a/src/tools/rbd_cache/CacheControllerSocketClient.hpp b/src/tools/rbd_cache/CacheControllerSocketClient.hpp -new file mode 100644 -index 0000000..8e61aa9 ---- /dev/null -+++ b/src/tools/rbd_cache/CacheControllerSocketClient.hpp -@@ -0,0 +1,131 @@ -+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -+// vim: ts=8 sw=2 smarttab -+ -+#ifndef CACHE_CONTROLLER_SOCKET_CLIENT_H -+#define CACHE_CONTROLLER_SOCKET_CLIENT_H -+ -+#include <boost/asio.hpp> -+#include <boost/bind.hpp> -+#include <boost/algorithm/string.hpp> -+#include "include/assert.h" -+#include "CacheControllerSocketCommon.h" -+ -+ -+using boost::asio::local::stream_protocol; -+ -+class CacheClient { -+public: -+ CacheClient(boost::asio::io_service& io_service, -+ const std::string& file, ClientProcessMsg processmsg) -+ : io_service_(io_service), -+ io_service_work_(io_service), -+ socket_(io_service), -+ m_client_process_msg(processmsg), -+ ep_(stream_protocol::endpoint(file)) -+ { -+ std::thread thd([this](){io_service_.run(); }); -+ thd.detach(); -+ } -+ -+ void run(){ -+ } -+ -+ int connect() { -+ try { -+ socket_.connect(ep_); -+ } catch (std::exception& e) { -+ return -1; -+ } -+ connected = true; -+ return 0; -+ } -+ -+ int register_volume(std::string pool_name, std::string vol_name, uint64_t vol_size) { -+ // cache controller will init layout -+ rbdsc_req_type_t *message = new rbdsc_req_type_t(); -+ message->type = RBDSC_REGISTER; -+ memcpy(message->pool_name, pool_name.c_str(), pool_name.size()); -+ memcpy(message->vol_name, vol_name.c_str(), vol_name.size()); -+ message->vol_size = vol_size; -+ message->offset = 0; -+ message->length = 0; -+ boost::asio::async_write(socket_, boost::asio::buffer((char*)message, message->size()), -+ [this](const boost::system::error_code& err, size_t cb) { -+ if (!err) { -+ boost::asio::async_read(socket_, boost::asio::buffer(buffer_), -+ boost::asio::transfer_exactly(544), -+ [this](const boost::system::error_code& err, size_t cb) { -+ if (!err) { -+ m_client_process_msg(std::string(buffer_, cb)); -+ } else { -+ return -1; -+ } -+ }); -+ } else { -+ return -1; -+ } -+ }); -+ -+ return 0; -+ } -+ -+ int lookup_object(std::string pool_name, std::string vol_name, std::string object_id, bool* result) { -+ rbdsc_req_type_t *message = new rbdsc_req_type_t(); -+ message->type = RBDSC_READ; -+ memcpy(message->pool_name, pool_name.c_str(), pool_name.size()); -+ memcpy(message->vol_name, object_id.c_str(), object_id.size()); -+ message->vol_size = 0; -+ message->offset = 0; -+ message->length = 0; -+ -+ boost::asio::async_write(socket_, boost::asio::buffer((char*)message, message->size()), -+ [this, result](const boost::system::error_code& err, size_t cb) { -+ if (!err) { -+ get_result(result); -+ } else { -+ return -1; -+ } -+ }); -+ std::unique_lock<std::mutex> lk(m); -+ cv.wait(lk); -+ return 0; -+ } -+ -+ void get_result(bool* result) { -+ boost::asio::async_read(socket_, boost::asio::buffer(buffer_), -+ boost::asio::transfer_exactly(544), -+ [this, result](const boost::system::error_code& err, size_t cb) { -+ if (!err) { -+ *result = true; -+ cv.notify_one(); -+ m_client_process_msg(std::string(buffer_, cb)); -+ } else { -+ return -1; -+ } -+ }); -+ } -+ -+ void handle_connect(const boost::system::error_code& error) { -+ //TODO(): open librbd snap -+ } -+ -+ void handle_write(const boost::system::error_code& error) { -+ } -+ -+private: -+ boost::asio::io_service& io_service_; -+ boost::asio::io_service::work io_service_work_; -+ stream_protocol::socket socket_; -+ ClientProcessMsg m_client_process_msg; -+ stream_protocol::endpoint ep_; -+ char buffer_[1024]; -+ int block_size_ = 1024; -+ -+ std::condition_variable cv; -+ std::mutex m; -+ -+public: -+ bool connected = false; -+}; -+ -+#endif -diff --git a/src/tools/rbd_cache/CacheControllerSocketCommon.h b/src/tools/rbd_cache/CacheControllerSocketCommon.h -new file mode 100644 -index 0000000..e253bb1 ---- /dev/null -+++ b/src/tools/rbd_cache/CacheControllerSocketCommon.h -@@ -0,0 +1,43 @@ -+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -+// vim: ts=8 sw=2 smarttab -+ -+#ifndef CACHE_CONTROLLER_SOCKET_COMMON_H -+#define CACHE_CONTROLLER_SOCKET_COMMON_H -+ -+#define RBDSC_REGISTER 0X11 -+#define RBDSC_READ 0X12 -+#define RBDSC_LOOKUP 0X13 -+#define RBDSC_REGISTER_REPLY 0X14 -+#define RBDSC_READ_REPLY 0X15 -+#define RBDSC_LOOKUP_REPLY 0X16 -+#define RBDSC_READ_RADOS 0X16 -+ -+typedef std::function<void(uint64_t, std::string)> ProcessMsg; -+typedef std::function<void(std::string)> ClientProcessMsg; -+typedef uint8_t rbdsc_req_type; -+struct rbdsc_req_type_t { -+ rbdsc_req_type type; -+ uint64_t vol_size; -+ uint64_t offset; -+ uint64_t length; -+ char pool_name[256]; -+ char vol_name[256]; -+ -+ uint64_t size() { -+ return sizeof(rbdsc_req_type_t); -+ } -+ -+ std::string to_buffer() { -+ std::stringstream ss; -+ ss << type; -+ ss << vol_size; -+ ss << offset; -+ ss << length; -+ ss << pool_name; -+ ss << vol_name; -+ -+ return ss.str(); -+ } -+}; -+ -+#endif -diff --git a/src/tools/rbd_cache/ObjectCacheStore.cc b/src/tools/rbd_cache/ObjectCacheStore.cc -new file mode 100644 -index 0000000..90b407c ---- /dev/null -+++ b/src/tools/rbd_cache/ObjectCacheStore.cc -@@ -0,0 +1,147 @@ -+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -+// vim: ts=8 sw=2 smarttab -+ -+#include "ObjectCacheStore.h" -+ -+#define dout_context g_ceph_context -+#define dout_subsys ceph_subsys_rbd_cache -+#undef dout_prefix -+#define dout_prefix *_dout << "rbd::cache::ObjectCacheStore: " << this << " " \ -+ << __func__ << ": " -+ -+ -+ObjectCacheStore::ObjectCacheStore(CephContext *cct, ContextWQ* work_queue) -+ : m_cct(cct), m_work_queue(work_queue), -+ m_cache_table_lock("rbd::cache::ObjectCacheStore"), -+ m_rados(new librados::Rados()) { -+} -+ -+ObjectCacheStore::~ObjectCacheStore() { -+ -+} -+ -+int ObjectCacheStore::init(bool reset) { -+ -+ int ret = m_rados->init_with_context(m_cct); -+ if(ret < 0) { -+ lderr(m_cct) << "fail to init Ceph context" << dendl; -+ return ret; -+ } -+ -+ ret = m_rados->connect(); -+ if(ret < 0 ) { -+ lderr(m_cct) << "fail to conect to cluster" << dendl; -+ return ret; -+ } -+ //TODO(): check existing cache objects -+ return ret; -+} -+ -+int ObjectCacheStore::do_promote(std::string pool_name, std::string object_name) { -+ int ret = 0; -+ std::string cache_file_name = pool_name + object_name; -+ -+ if (m_ioctxs.find(pool_name) == m_ioctxs.end()) { -+ librados::IoCtx* io_ctx = new librados::IoCtx(); -+ ret = m_rados->ioctx_create(pool_name.c_str(), *io_ctx); -+ if (ret < 0) { -+ lderr(m_cct) << "fail to create ioctx" << dendl; -+ assert(0); -+ } -+ m_ioctxs.emplace(pool_name, io_ctx); -+ } -+ -+ assert(m_ioctxs.find(pool_name) != m_ioctxs.end()); -+ -+ librados::IoCtx* ioctx = m_ioctxs[pool_name]; -+ -+ //promoting: update metadata -+ { -+ Mutex::Locker locker(m_cache_table_lock); -+ m_cache_table.emplace(cache_file_name, PROMOTING); -+ } -+ -+ librados::bufferlist read_buf; -+ int object_size = 4096*1024; //TODO(): read config from image metadata -+ -+ //TODO(): async promote -+ ret = promote_object(ioctx, object_name, read_buf, object_size); -+ if (ret == -ENOENT) { -+ read_buf.append(std::string(object_size, '0')); -+ ret = 0; -+ } -+ -+ if( ret < 0) { -+ lderr(m_cct) << "fail to read from rados" << dendl; -+ return ret; -+ } -+ -+ // persistent to cache -+ os::CacheStore::SyncFile cache_file(m_cct, cache_file_name); -+ cache_file.open(); -+ ret = cache_file.write_object_to_file(read_buf, object_size); -+ -+ assert(m_cache_table.find(cache_file_name) != m_cache_table.end()); -+ -+ // update metadata -+ { -+ Mutex::Locker locker(m_cache_table_lock); -+ m_cache_table.emplace(cache_file_name, PROMOTED); -+ } -+ -+ return ret; -+ -+} -+ -+int ObjectCacheStore::lookup_object(std::string pool_name, std::string object_name) { -+ -+ std::string cache_file_name = pool_name + object_name; -+ { -+ Mutex::Locker locker(m_cache_table_lock); -+ -+ auto it = m_cache_table.find(cache_file_name); -+ if (it != m_cache_table.end()) { -+ -+ if (it->second == PROMOTING) { -+ return -1; -+ } else if (it->second == PROMOTED) { -+ return 0; -+ } else { -+ assert(0); -+ } -+ } -+ } -+ -+ int ret = do_promote(pool_name, object_name); -+ -+ return ret; -+} -+ -+int ObjectCacheStore::shutdown() { -+ m_rados->shutdown(); -+ return 0; -+} -+ -+int ObjectCacheStore::init_cache(std::string vol_name, uint64_t vol_size) { -+ return 0; -+} -+ -+int ObjectCacheStore::lock_cache(std::string vol_name) { -+ return 0; -+} -+ -+int ObjectCacheStore::promote_object(librados::IoCtx* ioctx, std::string object_name, librados::bufferlist read_buf, uint64_t read_len) { -+ int ret; -+ -+ librados::AioCompletion* read_completion = librados::Rados::aio_create_completion(); -+ -+ ret = ioctx->aio_read(object_name, read_completion, &read_buf, read_len, 0); -+ if(ret < 0) { -+ lderr(m_cct) << "fail to read from rados" << dendl; -+ return ret; -+ } -+ read_completion->wait_for_complete(); -+ ret = read_completion->get_return_value(); -+ return ret; -+ -+} -diff --git a/src/tools/rbd_cache/ObjectCacheStore.h b/src/tools/rbd_cache/ObjectCacheStore.h -new file mode 100644 -index 0000000..12f8399 ---- /dev/null -+++ b/src/tools/rbd_cache/ObjectCacheStore.h -@@ -0,0 +1,65 @@ -+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -+// vim: ts=8 sw=2 smarttab -+ -+#ifndef OBJECT_CACHE_STORE_H -+#define OBJECT_CACHE_STORE_H -+ -+#include "common/debug.h" -+#include "common/errno.h" -+#include "common/ceph_context.h" -+#include "common/Mutex.h" -+#include "include/rados/librados.hpp" -+#include "include/rbd/librbd.h" -+#include "librbd/ImageCtx.h" -+#include "librbd/ImageState.h" -+#include "os/CacheStore/SyncFile.h" -+ -+using librados::Rados; -+using librados::IoCtx; -+ -+typedef shared_ptr<librados::Rados> RadosRef; -+typedef shared_ptr<librados::IoCtx> IoCtxRef; -+ -+class ObjectCacheStore -+{ -+ public: -+ ObjectCacheStore(CephContext *cct, ContextWQ* work_queue); -+ ~ObjectCacheStore(); -+ -+ int init(bool reset); -+ -+ int shutdown(); -+ -+ int lookup_object(std::string pool_name, std::string object_name); -+ -+ int init_cache(std::string vol_name, uint64_t vol_size); -+ -+ int lock_cache(std::string vol_name); -+ -+ private: -+ int _evict_object(); -+ -+ int do_promote(std::string pool_name, std::string object_name); -+ -+ int promote_object(librados::IoCtx*, std::string object_name, -+ librados::bufferlist read_buf, -+ uint64_t length); -+ -+ enum { -+ PROMOTING = 0, -+ PROMOTED, -+ }; -+ -+ CephContext *m_cct; -+ ContextWQ* m_work_queue; -+ Mutex m_cache_table_lock; -+ RadosRef m_rados; -+ -+ std::map<std::string, uint8_t> m_cache_table; -+ -+ std::map<std::string, librados::IoCtx*> m_ioctxs; -+ -+ os::CacheStore::SyncFile *m_cache_file; -+}; -+ -+#endif -diff --git a/src/tools/rbd_cache/main.cc b/src/tools/rbd_cache/main.cc -new file mode 100644 -index 0000000..336a581 ---- /dev/null -+++ b/src/tools/rbd_cache/main.cc -@@ -0,0 +1,85 @@ -+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -+// vim: ts=8 sw=2 smarttab -+ -+#include "common/ceph_argparse.h" -+#include "common/config.h" -+#include "common/debug.h" -+#include "common/errno.h" -+#include "global/global_init.h" -+#include "global/signal_handler.h" -+#include "CacheController.hpp" -+ -+#include <vector> -+ -+CacheController *cachectl = nullptr; -+ -+void usage() { -+ std::cout << "usage: cache controller [options...]" << std::endl; -+ std::cout << "options:\n"; -+ std::cout << " -m monaddress[:port] connect to specified monitor\n"; -+ std::cout << " --keyring=<path> path to keyring for local cluster\n"; -+ std::cout << " --log-file=<logfile> file to log debug output\n"; -+ std::cout << " --debug-rbd-cachecontroller=<log-level>/<memory-level> set rbd-mirror debug level\n"; -+ generic_server_usage(); -+} -+ -+static void handle_signal(int signum) -+{ -+ if (cachectl) -+ cachectl->handle_signal(signum); -+} -+ -+int main(int argc, const char **argv) -+{ -+ std::vector<const char*> args; -+ env_to_vec(args); -+ argv_to_vec(argc, argv, args); -+ -+ auto cct = global_init(nullptr, args, CEPH_ENTITY_TYPE_CLIENT, -+ CODE_ENVIRONMENT_DAEMON, -+ CINIT_FLAG_UNPRIVILEGED_DAEMON_DEFAULTS); -+ -+ for (auto i = args.begin(); i != args.end(); ++i) { -+ if (ceph_argparse_flag(args, i, "-h", "--help", (char*)NULL)) { -+ usage(); -+ return EXIT_SUCCESS; -+ } -+ } -+ -+ if (g_conf->daemonize) { -+ global_init_daemonize(g_ceph_context); -+ } -+ g_ceph_context->enable_perf_counter(); -+ -+ common_init_finish(g_ceph_context); -+ -+ init_async_signal_handler(); -+ register_async_signal_handler(SIGHUP, sighup_handler); -+ register_async_signal_handler_oneshot(SIGINT, handle_signal); -+ register_async_signal_handler_oneshot(SIGTERM, handle_signal); -+ -+ std::vector<const char*> cmd_args; -+ argv_to_vec(argc, argv, cmd_args); -+ -+ // disable unnecessary librbd cache -+ g_ceph_context->_conf->set_val_or_die("rbd_cache", "false"); -+ -+ cachectl = new CacheController(g_ceph_context, cmd_args); -+ int r = cachectl->init(); -+ if (r < 0) { -+ std::cerr << "failed to initialize: " << cpp_strerror(r) << std::endl; -+ goto cleanup; -+ } -+ -+ cachectl->run(); -+ -+ cleanup: -+ unregister_async_signal_handler(SIGHUP, sighup_handler); -+ unregister_async_signal_handler(SIGINT, handle_signal); -+ unregister_async_signal_handler(SIGTERM, handle_signal); -+ shutdown_async_signal_handler(); -+ -+ delete cachectl; -+ -+ return r < 0 ? EXIT_SUCCESS : EXIT_FAILURE; -+} --- -2.7.4 - diff --git a/src/ceph/0002-librbd-cleanup-rbd-shared-RO-cache.patch b/src/ceph/0002-librbd-cleanup-rbd-shared-RO-cache.patch deleted file mode 100644 index 7fb78e3..0000000 --- a/src/ceph/0002-librbd-cleanup-rbd-shared-RO-cache.patch +++ /dev/null @@ -1,847 +0,0 @@ -From 26f4a0804c035895fd77e9a70f47ede3f4512bde Mon Sep 17 00:00:00 2001 -From: Yuan Zhou <yuan.zhou@intel.com> -Date: Wed, 20 Jun 2018 11:34:17 +0800 -Subject: [PATCH 02/10] librbd: cleanup rbd shared RO cache - -- adding namespace for rbd cache controller -- move SyncFile code under librbd/cache/ - -Signed-off-by: Yuan Zhou <yuan.zhou@intel.com> ---- - src/librbd/CMakeLists.txt | 2 +- - src/librbd/cache/SharedPersistentObjectCacher.cc | 4 +- - src/librbd/cache/SharedPersistentObjectCacher.h | 6 +- - .../cache/SharedPersistentObjectCacherFile.cc | 114 +++++++++++++++++++++ - .../cache/SharedPersistentObjectCacherFile.h | 74 +++++++++++++ - .../SharedPersistentObjectCacherObjectDispatch.cc | 4 +- - .../SharedPersistentObjectCacherObjectDispatch.h | 2 +- - src/os/CacheStore/SyncFile.cc | 110 -------------------- - src/os/CacheStore/SyncFile.h | 74 ------------- - src/tools/rbd_cache/CMakeLists.txt | 2 +- - src/tools/rbd_cache/CacheController.cc | 9 +- - src/tools/rbd_cache/CacheController.h | 55 ++++++++++ - src/tools/rbd_cache/CacheController.hpp | 49 --------- - src/tools/rbd_cache/CacheControllerSocket.hpp | 6 ++ - .../rbd_cache/CacheControllerSocketClient.hpp | 5 + - src/tools/rbd_cache/CacheControllerSocketCommon.h | 5 + - src/tools/rbd_cache/ObjectCacheStore.cc | 7 +- - src/tools/rbd_cache/ObjectCacheStore.h | 9 +- - src/tools/rbd_cache/main.cc | 6 +- - 19 files changed, 293 insertions(+), 250 deletions(-) - create mode 100644 src/librbd/cache/SharedPersistentObjectCacherFile.cc - create mode 100644 src/librbd/cache/SharedPersistentObjectCacherFile.h - delete mode 100644 src/os/CacheStore/SyncFile.cc - delete mode 100644 src/os/CacheStore/SyncFile.h - create mode 100644 src/tools/rbd_cache/CacheController.h - delete mode 100644 src/tools/rbd_cache/CacheController.hpp - -diff --git a/src/librbd/CMakeLists.txt b/src/librbd/CMakeLists.txt -index 92539a8..540ee78 100644 ---- a/src/librbd/CMakeLists.txt -+++ b/src/librbd/CMakeLists.txt -@@ -34,6 +34,7 @@ set(librbd_internal_srcs - cache/ObjectCacherObjectDispatch.cc - cache/SharedPersistentObjectCacherObjectDispatch.cc - cache/SharedPersistentObjectCacher.cc -+ cache/SharedPersistentObjectCacherFile.cc - deep_copy/ImageCopyRequest.cc - deep_copy/MetadataCopyRequest.cc - deep_copy/ObjectCopyRequest.cc -@@ -124,7 +125,6 @@ set(librbd_internal_srcs - trash/MoveRequest.cc - watcher/Notifier.cc - watcher/RewatchRequest.cc -- ${CMAKE_SOURCE_DIR}/src/os/CacheStore/SyncFile.cc - ${CMAKE_SOURCE_DIR}/src/common/ContextCompletion.cc) - - add_library(rbd_api STATIC librbd.cc) -diff --git a/src/librbd/cache/SharedPersistentObjectCacher.cc b/src/librbd/cache/SharedPersistentObjectCacher.cc -index a849260..260567c 100644 ---- a/src/librbd/cache/SharedPersistentObjectCacher.cc -+++ b/src/librbd/cache/SharedPersistentObjectCacher.cc -@@ -19,7 +19,7 @@ SharedPersistentObjectCacher<I>::SharedPersistentObjectCacher(I *image_ctx, std: - : m_image_ctx(image_ctx), m_cache_path(cache_path), - m_file_map_lock("librbd::cache::SharedObjectCacher::filemaplock") { - auto *cct = m_image_ctx->cct; -- -+ ldout(cct, 20) << dendl; - } - - template <typename I> -@@ -40,7 +40,7 @@ int SharedPersistentObjectCacher<I>::read_object(std::string oid, ceph::bufferli - std::string cache_file_name = m_image_ctx->data_ctx.get_pool_name() + oid; - - //TODO(): make a cache for cachefile fd -- os::CacheStore::SyncFile* target_cache_file = new os::CacheStore::SyncFile(cct, cache_file_name); -+ SyncFile* target_cache_file = new SyncFile(cct, cache_file_name); - target_cache_file->open(); - - int ret = target_cache_file->read_object_from_file(read_data, offset, length); -diff --git a/src/librbd/cache/SharedPersistentObjectCacher.h b/src/librbd/cache/SharedPersistentObjectCacher.h -index d108a05..af04e63 100644 ---- a/src/librbd/cache/SharedPersistentObjectCacher.h -+++ b/src/librbd/cache/SharedPersistentObjectCacher.h -@@ -6,7 +6,7 @@ - - #include "include/buffer_fwd.h" - #include "include/int_types.h" --#include "os/CacheStore/SyncFile.h" -+#include "SharedPersistentObjectCacherFile.h" - #include "common/Mutex.h" - #include <vector> - #include <map> -@@ -31,9 +31,9 @@ public: - - private: - ImageCtxT *m_image_ctx; -- std::map<std::string, os::CacheStore::SyncFile*> file_map; -- Mutex m_file_map_lock; - std::string m_cache_path; -+ Mutex m_file_map_lock; -+ std::map<std::string, SyncFile*> file_map; - - }; - -diff --git a/src/librbd/cache/SharedPersistentObjectCacherFile.cc b/src/librbd/cache/SharedPersistentObjectCacherFile.cc -new file mode 100644 -index 0000000..75a3053 ---- /dev/null -+++ b/src/librbd/cache/SharedPersistentObjectCacherFile.cc -@@ -0,0 +1,114 @@ -+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -+// vim: ts=8 sw=2 smarttab -+ -+#include "SharedPersistentObjectCacherFile.h" -+#include "include/Context.h" -+#include "common/dout.h" -+#include "common/WorkQueue.h" -+#include "librbd/ImageCtx.h" -+#include <sys/types.h> -+#include <sys/stat.h> -+#include <aio.h> -+#include <errno.h> -+#include <fcntl.h> -+#include <utility> -+ -+#define dout_subsys ceph_subsys_rbd -+#undef dout_prefix -+#define dout_prefix *_dout << "librbd::cache::SyncFile: " << this << " " \ -+ << __func__ << ": " -+ -+namespace librbd { -+namespace cache { -+ -+SyncFile::SyncFile(CephContext *cct, const std::string &name) -+ : cct(cct) { -+ m_name = cct->_conf->get_val<std::string>("rbd_shared_cache_path") + "/rbd_cache." + name; -+ ldout(cct, 20) << "file path=" << m_name << dendl; -+} -+ -+SyncFile::~SyncFile() { -+ // TODO force proper cleanup -+ if (m_fd != -1) { -+ ::close(m_fd); -+ } -+} -+ -+void SyncFile::open(Context *on_finish) { -+ while (true) { -+ m_fd = ::open(m_name.c_str(), O_CREAT | O_DIRECT | O_NOATIME | O_RDWR | O_SYNC, -+ S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP); -+ if (m_fd == -1) { -+ int r = -errno; -+ if (r == -EINTR) { -+ continue; -+ } -+ on_finish->complete(r); -+ return; -+ } -+ break; -+ } -+ -+ on_finish->complete(0); -+} -+ -+void SyncFile::open() { -+ while (true) -+ { -+ m_fd = ::open(m_name.c_str(), O_CREAT | O_NOATIME | O_RDWR | O_SYNC, -+ S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP); -+ if (m_fd == -1) -+ { -+ int r = -errno; -+ if (r == -EINTR) { -+ continue; -+ } -+ return; -+ } -+ break; -+ } -+} -+ -+void SyncFile::read(uint64_t offset, uint64_t length, ceph::bufferlist *bl, Context *on_finish) { -+ on_finish->complete(read_object_from_file(bl, offset, length)); -+} -+ -+void SyncFile::write(uint64_t offset, ceph::bufferlist &&bl, bool fdatasync, Context *on_finish) { -+ on_finish->complete(write_object_to_file(bl, bl.length())); -+} -+ -+int SyncFile::write_object_to_file(ceph::bufferlist read_buf, uint64_t object_len) { -+ -+ ldout(cct, 20) << "cache file name:" << m_name -+ << ", length:" << object_len << dendl; -+ -+ // TODO(): aio -+ int ret = pwrite(m_fd, read_buf.c_str(), object_len, 0); -+ if(ret < 0) { -+ lderr(cct)<<"write file fail:" << std::strerror(errno) << dendl; -+ return ret; -+ } -+ -+ return ret; -+} -+ -+int SyncFile::read_object_from_file(ceph::bufferlist* read_buf, uint64_t object_off, uint64_t object_len) { -+ -+ ldout(cct, 20) << "offset:" << object_off -+ << ", length:" << object_len << dendl; -+ -+ bufferptr buf(object_len); -+ -+ // TODO(): aio -+ int ret = pread(m_fd, buf.c_str(), object_len, object_off); -+ if(ret < 0) { -+ lderr(cct)<<"read file fail:" << std::strerror(errno) << dendl; -+ return ret; -+ } -+ read_buf->append(std::move(buf)); -+ -+ return ret; -+} -+ -+} // namespace cache -+} // namespace librbd -diff --git a/src/librbd/cache/SharedPersistentObjectCacherFile.h b/src/librbd/cache/SharedPersistentObjectCacherFile.h -new file mode 100644 -index 0000000..ccbe730 ---- /dev/null -+++ b/src/librbd/cache/SharedPersistentObjectCacherFile.h -@@ -0,0 +1,74 @@ -+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -+// vim: ts=8 sw=2 smarttab -+ -+#ifndef CEPH_LIBRBD_CACHE_STORE_SYNC_FILE -+#define CEPH_LIBRBD_CACHE_STORE_SYNC_FILE -+ -+#include "include/buffer_fwd.h" -+#include <sys/mman.h> -+#include <string> -+ -+struct Context; -+struct ContextWQ; -+class CephContext; -+ -+namespace librbd { -+ -+namespace cache { -+ -+class SyncFile { -+public: -+ SyncFile(CephContext *cct, const std::string &name); -+ ~SyncFile(); -+ -+ // TODO use IO queue instead of individual commands so operations can be -+ // submitted in batch -+ -+ // TODO use scatter/gather API -+ -+ void open(Context *on_finish); -+ -+ // ## -+ void open(); -+ bool try_open(); -+ void close(Context *on_finish); -+ void remove(Context *on_finish); -+ -+ void read(uint64_t offset, uint64_t length, ceph::bufferlist *bl, Context *on_finish); -+ -+ void write(uint64_t offset, ceph::bufferlist &&bl, bool fdatasync, Context *on_finish); -+ -+ void discard(uint64_t offset, uint64_t length, bool fdatasync, Context *on_finish); -+ -+ void truncate(uint64_t length, bool fdatasync, Context *on_finish); -+ -+ void fsync(Context *on_finish); -+ -+ void fdatasync(Context *on_finish); -+ -+ uint64_t filesize(); -+ -+ int load(void** dest, uint64_t filesize); -+ -+ int remove(); -+ -+ // ## -+ int write_object_to_file(ceph::bufferlist read_buf, uint64_t object_len); -+ int read_object_from_file(ceph::bufferlist* read_buf, uint64_t object_off, uint64_t object_len); -+ -+private: -+ CephContext *cct; -+ std::string m_name; -+ int m_fd = -1; -+ -+ int write(uint64_t offset, const ceph::bufferlist &bl, bool fdatasync); -+ int read(uint64_t offset, uint64_t length, ceph::bufferlist *bl); -+ int discard(uint64_t offset, uint64_t length, bool fdatasync); -+ int truncate(uint64_t length, bool fdatasync); -+ int fdatasync(); -+}; -+ -+} // namespace cache -+} // namespace librbd -+ -+#endif // CEPH_LIBRBD_CACHE_STORE_SYNC_FILE -diff --git a/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc b/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc -index 90d886c..2aa5cad 100644 ---- a/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc -+++ b/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc -@@ -52,7 +52,7 @@ void SharedPersistentObjectCacherObjectDispatch<I>::init() { - ldout(cct, 20) << "parent image: setup SRO cache client = " << dendl; - - std::string controller_path = "/tmp/rbd_shared_readonly_cache_demo"; -- m_cache_client = new CacheClient(io_service, controller_path.c_str(), -+ m_cache_client = new rbd::cache::CacheClient(io_service, controller_path.c_str(), - ([&](std::string s){client_handle_request(s);})); - - int ret = m_cache_client->connect(); -@@ -120,7 +120,7 @@ void SharedPersistentObjectCacherObjectDispatch<I>::client_handle_request(std::s - auto cct = m_image_ctx->cct; - ldout(cct, 20) << dendl; - -- rbdsc_req_type_t *io_ctx = (rbdsc_req_type_t*)(msg.c_str()); -+ rbd::cache::rbdsc_req_type_t *io_ctx = (rbd::cache::rbdsc_req_type_t*)(msg.c_str()); - - switch (io_ctx->type) { - case RBDSC_REGISTER_REPLY: { -diff --git a/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.h b/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.h -index 1ede804..200688f 100644 ---- a/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.h -+++ b/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.h -@@ -115,7 +115,7 @@ private: - ImageCtxT* m_image_ctx; - - void client_handle_request(std::string msg); -- CacheClient *m_cache_client = nullptr; -+ rbd::cache::CacheClient *m_cache_client = nullptr; - boost::asio::io_service io_service; - }; - -diff --git a/src/os/CacheStore/SyncFile.cc b/src/os/CacheStore/SyncFile.cc -deleted file mode 100644 -index 5352bde..0000000 ---- a/src/os/CacheStore/SyncFile.cc -+++ /dev/null -@@ -1,110 +0,0 @@ --// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- --// vim: ts=8 sw=2 smarttab -- --#include "os/CacheStore/SyncFile.h" --#include "include/Context.h" --#include "common/dout.h" --#include "common/WorkQueue.h" --#include "librbd/ImageCtx.h" --#include <sys/types.h> --#include <sys/stat.h> --#include <aio.h> --#include <errno.h> --#include <fcntl.h> --#include <utility> -- --#define dout_subsys ceph_subsys_rbd --#undef dout_prefix --#define dout_prefix *_dout << "librbd::file::SyncFile: " << this << " " \ -- << __func__ << ": " -- --namespace os { --namespace CacheStore { -- --SyncFile::SyncFile(CephContext *cct, const std::string &name) -- : cct(cct) --{ -- m_name = cct->_conf->get_val<std::string>("rbd_shared_cache_path") + "/rbd_cache." + name; -- ldout(cct, 20) << "file path=" << m_name << dendl; --} -- --SyncFile::~SyncFile() --{ -- // TODO force proper cleanup -- if (m_fd != -1) { -- ::close(m_fd); -- } --} -- --void SyncFile::open(Context *on_finish) --{ -- while (true) { -- m_fd = ::open(m_name.c_str(), O_CREAT | O_DIRECT | O_NOATIME | O_RDWR | O_SYNC, -- S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP); -- if (m_fd == -1) { -- int r = -errno; -- if (r == -EINTR) { -- continue; -- } -- on_finish->complete(r); -- return; -- } -- break; -- } -- -- on_finish->complete(0); --} -- --void SyncFile::open() --{ -- while (true) -- { -- m_fd = ::open(m_name.c_str(), O_CREAT | O_NOATIME | O_RDWR | O_SYNC, -- S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP); -- if (m_fd == -1) -- { -- int r = -errno; -- if (r == -EINTR) { -- continue; -- } -- return; -- } -- break; -- } --} -- --int SyncFile::write_object_to_file(ceph::bufferlist read_buf, uint64_t object_len) { -- -- ldout(cct, 20) << "cache file name:" << m_name -- << ", length:" << object_len << dendl; -- -- // TODO(): aio -- int ret = pwrite(m_fd, read_buf.c_str(), object_len, 0); -- if(ret < 0) { -- lderr(cct)<<"write file fail:" << std::strerror(errno) << dendl; -- return ret; -- } -- -- return ret; --} -- --int SyncFile::read_object_from_file(ceph::bufferlist* read_buf, uint64_t object_off, uint64_t object_len) { -- -- ldout(cct, 20) << "offset:" << object_off -- << ", length:" << object_len << dendl; -- -- bufferptr buf(object_len); -- -- // TODO(): aio -- int ret = pread(m_fd, buf.c_str(), object_len, object_off); -- if(ret < 0) { -- lderr(cct)<<"read file fail:" << std::strerror(errno) << dendl; -- return ret; -- } -- read_buf->append(std::move(buf)); -- -- return ret; --} -- --} // namespace CacheStore --} // namespace os -diff --git a/src/os/CacheStore/SyncFile.h b/src/os/CacheStore/SyncFile.h -deleted file mode 100644 -index 81602ce..0000000 ---- a/src/os/CacheStore/SyncFile.h -+++ /dev/null -@@ -1,74 +0,0 @@ --// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- --// vim: ts=8 sw=2 smarttab -- --#ifndef CEPH_LIBOS_CACHE_STORE_SYNC_FILE --#define CEPH_LIBOS_CACHE_STORE_SYNC_FILE -- --#include "include/buffer_fwd.h" --#include <sys/mman.h> --#include <string> -- --struct Context; --struct ContextWQ; --class CephContext; -- --namespace os { -- --namespace CacheStore { -- --class SyncFile { --public: -- SyncFile(CephContext *cct, const std::string &name); -- ~SyncFile(); -- -- // TODO use IO queue instead of individual commands so operations can be -- // submitted in batch -- -- // TODO use scatter/gather API -- -- void open(Context *on_finish); -- -- // ## -- void open(); -- bool try_open(); -- void close(Context *on_finish); -- void remove(Context *on_finish); -- -- void read(uint64_t offset, uint64_t length, ceph::bufferlist *bl, Context *on_finish); -- -- void write(uint64_t offset, ceph::bufferlist &&bl, bool fdatasync, Context *on_finish); -- -- void discard(uint64_t offset, uint64_t length, bool fdatasync, Context *on_finish); -- -- void truncate(uint64_t length, bool fdatasync, Context *on_finish); -- -- void fsync(Context *on_finish); -- -- void fdatasync(Context *on_finish); -- -- uint64_t filesize(); -- -- int load(void** dest, uint64_t filesize); -- -- int remove(); -- -- // ## -- int write_object_to_file(ceph::bufferlist read_buf, uint64_t object_len); -- int read_object_from_file(ceph::bufferlist* read_buf, uint64_t object_off, uint64_t object_len); -- --private: -- CephContext *cct; -- std::string m_name; -- int m_fd = -1; -- -- int write(uint64_t offset, const ceph::bufferlist &bl, bool fdatasync); -- int read(uint64_t offset, uint64_t length, ceph::bufferlist *bl); -- int discard(uint64_t offset, uint64_t length, bool fdatasync); -- int truncate(uint64_t length, bool fdatasync); -- int fdatasync(); --}; -- --} // namespace CacheStore --} // namespace os -- --#endif // CEPH_LIBOS_CACHE_STORE_SYNC_FILE -diff --git a/src/tools/rbd_cache/CMakeLists.txt b/src/tools/rbd_cache/CMakeLists.txt -index 08eae60..597d802 100644 ---- a/src/tools/rbd_cache/CMakeLists.txt -+++ b/src/tools/rbd_cache/CMakeLists.txt -@@ -1,5 +1,5 @@ - add_executable(rbd-cache -- ${CMAKE_SOURCE_DIR}/src/os/CacheStore/SyncFile.cc -+ ${CMAKE_SOURCE_DIR}/src/librbd/cache/SharedPersistentObjectCacherFile.cc - ObjectCacheStore.cc - CacheController.cc - main.cc) -diff --git a/src/tools/rbd_cache/CacheController.cc b/src/tools/rbd_cache/CacheController.cc -index c914358..e73ba25 100644 ---- a/src/tools/rbd_cache/CacheController.cc -+++ b/src/tools/rbd_cache/CacheController.cc -@@ -1,7 +1,7 @@ - // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- - // vim: ts=8 sw=2 smarttab - --#include "CacheController.hpp" -+#include "CacheController.h" - - #define dout_context g_ceph_context - #define dout_subsys ceph_subsys_rbd_cache -@@ -9,6 +9,8 @@ - #define dout_prefix *_dout << "rbd::cache::CacheController: " << this << " " \ - << __func__ << ": " - -+namespace rbd { -+namespace cache { - - class ThreadPoolSingleton : public ThreadPool { - public: -@@ -103,3 +105,8 @@ void CacheController::handle_request(uint64_t sesstion_id, std::string msg){ - - } - } -+ -+} // namespace rbd -+} // namespace cache -+ -+ -diff --git a/src/tools/rbd_cache/CacheController.h b/src/tools/rbd_cache/CacheController.h -new file mode 100644 -index 0000000..0e3abc1 ---- /dev/null -+++ b/src/tools/rbd_cache/CacheController.h -@@ -0,0 +1,55 @@ -+#ifndef CACHE_CONTROLLER_H -+#define CACHE_CONTROLLER_H -+ -+#include <thread> -+ -+#include "common/Formatter.h" -+#include "common/admin_socket.h" -+#include "common/debug.h" -+#include "common/errno.h" -+#include "common/ceph_context.h" -+#include "common/Mutex.h" -+#include "common/WorkQueue.h" -+#include "include/rados/librados.hpp" -+#include "include/rbd/librbd.h" -+#include "include/assert.h" -+#include "librbd/ImageCtx.h" -+#include "librbd/ImageState.h" -+ -+#include "CacheControllerSocket.hpp" -+#include "ObjectCacheStore.h" -+ -+ -+using boost::asio::local::stream_protocol; -+ -+namespace rbd { -+namespace cache { -+ -+class CacheController { -+ public: -+ CacheController(CephContext *cct, const std::vector<const char*> &args); -+ ~CacheController(); -+ -+ int init(); -+ -+ int shutdown(); -+ -+ void handle_signal(int sinnum); -+ -+ void run(); -+ -+ void handle_request(uint64_t sesstion_id, std::string msg); -+ -+ private: -+ boost::asio::io_service io_service; -+ CacheServer *m_cache_server; -+ std::vector<const char*> m_args; -+ CephContext *m_cct; -+ ObjectCacheStore *m_object_cache_store; -+ ContextWQ* pcache_op_work_queue; -+}; -+ -+} // namespace rbd -+} // namespace cache -+ -+#endif -diff --git a/src/tools/rbd_cache/CacheController.hpp b/src/tools/rbd_cache/CacheController.hpp -deleted file mode 100644 -index 97113e4..0000000 ---- a/src/tools/rbd_cache/CacheController.hpp -+++ /dev/null -@@ -1,49 +0,0 @@ --#ifndef CACHE_CONTROLLER_H --#define CACHE_CONTROLLER_H -- --#include <thread> -- --#include "common/Formatter.h" --#include "common/admin_socket.h" --#include "common/debug.h" --#include "common/errno.h" --#include "common/ceph_context.h" --#include "common/Mutex.h" --#include "common/WorkQueue.h" --#include "include/rados/librados.hpp" --#include "include/rbd/librbd.h" --#include "include/assert.h" --#include "librbd/ImageCtx.h" --#include "librbd/ImageState.h" -- --#include "CacheControllerSocket.hpp" --#include "ObjectCacheStore.h" -- -- --using boost::asio::local::stream_protocol; -- --class CacheController { -- public: -- CacheController(CephContext *cct, const std::vector<const char*> &args); -- ~CacheController(); -- -- int init(); -- -- int shutdown(); -- -- void handle_signal(int sinnum); -- -- void run(); -- -- void handle_request(uint64_t sesstion_id, std::string msg); -- -- private: -- boost::asio::io_service io_service; -- CacheServer *m_cache_server; -- std::vector<const char*> m_args; -- CephContext *m_cct; -- ObjectCacheStore *m_object_cache_store; -- ContextWQ* pcache_op_work_queue; --}; -- --#endif -diff --git a/src/tools/rbd_cache/CacheControllerSocket.hpp b/src/tools/rbd_cache/CacheControllerSocket.hpp -index 6e1a743..967af1d 100644 ---- a/src/tools/rbd_cache/CacheControllerSocket.hpp -+++ b/src/tools/rbd_cache/CacheControllerSocket.hpp -@@ -17,6 +17,9 @@ - - using boost::asio::local::stream_protocol; - -+namespace rbd { -+namespace cache { -+ - class session : public std::enable_shared_from_this<session> { - public: - session(uint64_t session_id, boost::asio::io_service& io_service, ProcessMsg processmsg) -@@ -122,4 +125,7 @@ private: - std::map<uint64_t, session_ptr> session_map; - }; - -+} // namespace cache -+} // namespace rbd -+ - #endif -diff --git a/src/tools/rbd_cache/CacheControllerSocketClient.hpp b/src/tools/rbd_cache/CacheControllerSocketClient.hpp -index 8e61aa9..57be78e 100644 ---- a/src/tools/rbd_cache/CacheControllerSocketClient.hpp -+++ b/src/tools/rbd_cache/CacheControllerSocketClient.hpp -@@ -13,6 +13,9 @@ - - using boost::asio::local::stream_protocol; - -+namespace rbd { -+namespace cache { -+ - class CacheClient { - public: - CacheClient(boost::asio::io_service& io_service, -@@ -128,4 +131,6 @@ public: - bool connected = false; - }; - -+} // namespace cache -+} // namespace rbd - #endif -diff --git a/src/tools/rbd_cache/CacheControllerSocketCommon.h b/src/tools/rbd_cache/CacheControllerSocketCommon.h -index e253bb1..ab89155 100644 ---- a/src/tools/rbd_cache/CacheControllerSocketCommon.h -+++ b/src/tools/rbd_cache/CacheControllerSocketCommon.h -@@ -12,6 +12,9 @@ - #define RBDSC_LOOKUP_REPLY 0X16 - #define RBDSC_READ_RADOS 0X16 - -+namespace rbd { -+namespace cache { -+ - typedef std::function<void(uint64_t, std::string)> ProcessMsg; - typedef std::function<void(std::string)> ClientProcessMsg; - typedef uint8_t rbdsc_req_type; -@@ -40,4 +43,6 @@ struct rbdsc_req_type_t { - } - }; - -+} // namespace cache -+} // namespace rbd - #endif -diff --git a/src/tools/rbd_cache/ObjectCacheStore.cc b/src/tools/rbd_cache/ObjectCacheStore.cc -index 90b407c..9572a1a 100644 ---- a/src/tools/rbd_cache/ObjectCacheStore.cc -+++ b/src/tools/rbd_cache/ObjectCacheStore.cc -@@ -9,6 +9,8 @@ - #define dout_prefix *_dout << "rbd::cache::ObjectCacheStore: " << this << " " \ - << __func__ << ": " - -+namespace rbd { -+namespace cache { - - ObjectCacheStore::ObjectCacheStore(CephContext *cct, ContextWQ* work_queue) - : m_cct(cct), m_work_queue(work_queue), -@@ -77,7 +79,7 @@ int ObjectCacheStore::do_promote(std::string pool_name, std::string object_name) - } - - // persistent to cache -- os::CacheStore::SyncFile cache_file(m_cct, cache_file_name); -+ librbd::cache::SyncFile cache_file(m_cct, cache_file_name); - cache_file.open(); - ret = cache_file.write_object_to_file(read_buf, object_size); - -@@ -145,3 +147,6 @@ int ObjectCacheStore::promote_object(librados::IoCtx* ioctx, std::string object_ - return ret; - - } -+ -+} // namespace cache -+} // namespace rbd -diff --git a/src/tools/rbd_cache/ObjectCacheStore.h b/src/tools/rbd_cache/ObjectCacheStore.h -index 12f8399..a81beea 100644 ---- a/src/tools/rbd_cache/ObjectCacheStore.h -+++ b/src/tools/rbd_cache/ObjectCacheStore.h -@@ -12,11 +12,14 @@ - #include "include/rbd/librbd.h" - #include "librbd/ImageCtx.h" - #include "librbd/ImageState.h" --#include "os/CacheStore/SyncFile.h" -+#include "librbd/cache/SharedPersistentObjectCacherFile.h" - - using librados::Rados; - using librados::IoCtx; - -+namespace rbd { -+namespace cache { -+ - typedef shared_ptr<librados::Rados> RadosRef; - typedef shared_ptr<librados::IoCtx> IoCtxRef; - -@@ -59,7 +62,9 @@ class ObjectCacheStore - - std::map<std::string, librados::IoCtx*> m_ioctxs; - -- os::CacheStore::SyncFile *m_cache_file; -+ librbd::cache::SyncFile *m_cache_file; - }; - -+} // namespace rbd -+} // namespace cache - #endif -diff --git a/src/tools/rbd_cache/main.cc b/src/tools/rbd_cache/main.cc -index 336a581..a7c5b64 100644 ---- a/src/tools/rbd_cache/main.cc -+++ b/src/tools/rbd_cache/main.cc -@@ -7,11 +7,11 @@ - #include "common/errno.h" - #include "global/global_init.h" - #include "global/signal_handler.h" --#include "CacheController.hpp" -+#include "CacheController.h" - - #include <vector> - --CacheController *cachectl = nullptr; -+rbd::cache::CacheController *cachectl = nullptr; - - void usage() { - std::cout << "usage: cache controller [options...]" << std::endl; -@@ -64,7 +64,7 @@ int main(int argc, const char **argv) - // disable unnecessary librbd cache - g_ceph_context->_conf->set_val_or_die("rbd_cache", "false"); - -- cachectl = new CacheController(g_ceph_context, cmd_args); -+ cachectl = new rbd::cache::CacheController(g_ceph_context, cmd_args); - int r = cachectl->init(); - if (r < 0) { - std::cerr << "failed to initialize: " << cpp_strerror(r) << std::endl; --- -2.7.4 - diff --git a/src/ceph/0003-librbd-fix-bufferlist-point.patch b/src/ceph/0003-librbd-fix-bufferlist-point.patch deleted file mode 100644 index b0030ea..0000000 --- a/src/ceph/0003-librbd-fix-bufferlist-point.patch +++ /dev/null @@ -1,71 +0,0 @@ -From 656b499b00fb237c83b8ccccc519a1577d981199 Mon Sep 17 00:00:00 2001 -From: Yuan Zhou <yuan.zhou@intel.com> -Date: Mon, 6 Aug 2018 15:38:35 +0800 -Subject: [PATCH 03/10] librbd: fix bufferlist point - -Signed-off-by: Yuan Zhou <yuan.zhou@intel.com> ---- - src/tools/rbd_cache/ObjectCacheStore.cc | 10 +++++----- - src/tools/rbd_cache/ObjectCacheStore.h | 2 +- - 2 files changed, 6 insertions(+), 6 deletions(-) - -diff --git a/src/tools/rbd_cache/ObjectCacheStore.cc b/src/tools/rbd_cache/ObjectCacheStore.cc -index 9572a1a..2a87469 100644 ---- a/src/tools/rbd_cache/ObjectCacheStore.cc -+++ b/src/tools/rbd_cache/ObjectCacheStore.cc -@@ -63,13 +63,13 @@ int ObjectCacheStore::do_promote(std::string pool_name, std::string object_name) - m_cache_table.emplace(cache_file_name, PROMOTING); - } - -- librados::bufferlist read_buf; -+ librados::bufferlist* read_buf = new librados::bufferlist(); - int object_size = 4096*1024; //TODO(): read config from image metadata - - //TODO(): async promote - ret = promote_object(ioctx, object_name, read_buf, object_size); - if (ret == -ENOENT) { -- read_buf.append(std::string(object_size, '0')); -+ read_buf->append(std::string(object_size, '0')); - ret = 0; - } - -@@ -81,7 +81,7 @@ int ObjectCacheStore::do_promote(std::string pool_name, std::string object_name) - // persistent to cache - librbd::cache::SyncFile cache_file(m_cct, cache_file_name); - cache_file.open(); -- ret = cache_file.write_object_to_file(read_buf, object_size); -+ ret = cache_file.write_object_to_file(*read_buf, object_size); - - assert(m_cache_table.find(cache_file_name) != m_cache_table.end()); - -@@ -132,12 +132,12 @@ int ObjectCacheStore::lock_cache(std::string vol_name) { - return 0; - } - --int ObjectCacheStore::promote_object(librados::IoCtx* ioctx, std::string object_name, librados::bufferlist read_buf, uint64_t read_len) { -+int ObjectCacheStore::promote_object(librados::IoCtx* ioctx, std::string object_name, librados::bufferlist* read_buf, uint64_t read_len) { - int ret; - - librados::AioCompletion* read_completion = librados::Rados::aio_create_completion(); - -- ret = ioctx->aio_read(object_name, read_completion, &read_buf, read_len, 0); -+ ret = ioctx->aio_read(object_name, read_completion, read_buf, read_len, 0); - if(ret < 0) { - lderr(m_cct) << "fail to read from rados" << dendl; - return ret; -diff --git a/src/tools/rbd_cache/ObjectCacheStore.h b/src/tools/rbd_cache/ObjectCacheStore.h -index a81beea..db09efa 100644 ---- a/src/tools/rbd_cache/ObjectCacheStore.h -+++ b/src/tools/rbd_cache/ObjectCacheStore.h -@@ -45,7 +45,7 @@ class ObjectCacheStore - int do_promote(std::string pool_name, std::string object_name); - - int promote_object(librados::IoCtx*, std::string object_name, -- librados::bufferlist read_buf, -+ librados::bufferlist* read_buf, - uint64_t length); - - enum { --- -2.7.4 - diff --git a/src/ceph/0004-librbd-fix-lookup-object-return.patch b/src/ceph/0004-librbd-fix-lookup-object-return.patch deleted file mode 100644 index 0a2e2eb..0000000 --- a/src/ceph/0004-librbd-fix-lookup-object-return.patch +++ /dev/null @@ -1,45 +0,0 @@ -From 779ef67e6a401ad569c5d3d3a076352db8ad0f67 Mon Sep 17 00:00:00 2001 -From: Yuan Zhou <yuan.zhou@intel.com> -Date: Mon, 6 Aug 2018 15:47:23 +0800 -Subject: [PATCH 04/10] librbd: fix lookup object return - -Signed-off-by: Yuan Zhou <yuan.zhou@intel.com> ---- - src/tools/rbd_cache/CacheControllerSocketClient.hpp | 7 ++++++- - src/tools/rbd_cache/CacheControllerSocketCommon.h | 2 +- - 2 files changed, 7 insertions(+), 2 deletions(-) - -diff --git a/src/tools/rbd_cache/CacheControllerSocketClient.hpp b/src/tools/rbd_cache/CacheControllerSocketClient.hpp -index 57be78e..4e1f36c 100644 ---- a/src/tools/rbd_cache/CacheControllerSocketClient.hpp -+++ b/src/tools/rbd_cache/CacheControllerSocketClient.hpp -@@ -99,7 +99,12 @@ public: - boost::asio::transfer_exactly(544), - [this, result](const boost::system::error_code& err, size_t cb) { - if (!err) { -- *result = true; -+ rbdsc_req_type_t *io_ctx = (rbdsc_req_type_t*)(buffer_); -+ if (io_ctx->type == RBDSC_READ_REPLY) { -+ *result = true; -+ } else { -+ *result = false; -+ } - cv.notify_one(); - m_client_process_msg(std::string(buffer_, cb)); - } else { -diff --git a/src/tools/rbd_cache/CacheControllerSocketCommon.h b/src/tools/rbd_cache/CacheControllerSocketCommon.h -index ab89155..a9d73a8 100644 ---- a/src/tools/rbd_cache/CacheControllerSocketCommon.h -+++ b/src/tools/rbd_cache/CacheControllerSocketCommon.h -@@ -10,7 +10,7 @@ - #define RBDSC_REGISTER_REPLY 0X14 - #define RBDSC_READ_REPLY 0X15 - #define RBDSC_LOOKUP_REPLY 0X16 --#define RBDSC_READ_RADOS 0X16 -+#define RBDSC_READ_RADOS 0X17 - - namespace rbd { - namespace cache { --- -2.7.4 - diff --git a/src/ceph/0005-librbd-fix-conf-get_val.patch b/src/ceph/0005-librbd-fix-conf-get_val.patch deleted file mode 100644 index 6bc5268..0000000 --- a/src/ceph/0005-librbd-fix-conf-get_val.patch +++ /dev/null @@ -1,63 +0,0 @@ -From be47f5e7980d92379b9feaef2e2f1e1579df94ba Mon Sep 17 00:00:00 2001 -From: Yuan Zhou <yuan.zhou@intel.com> -Date: Mon, 6 Aug 2018 22:05:13 +0800 -Subject: [PATCH 05/10] librbd: fix conf get_val - -Signed-off-by: Yuan Zhou <yuan.zhou@intel.com> ---- - src/librbd/cache/SharedPersistentObjectCacherFile.cc | 2 +- - src/tools/rbd_cache/CacheController.cc | 2 +- - src/tools/rbd_cache/main.cc | 4 ++-- - 3 files changed, 4 insertions(+), 4 deletions(-) - -diff --git a/src/librbd/cache/SharedPersistentObjectCacherFile.cc b/src/librbd/cache/SharedPersistentObjectCacherFile.cc -index 75a3053..6f9c3e0 100644 ---- a/src/librbd/cache/SharedPersistentObjectCacherFile.cc -+++ b/src/librbd/cache/SharedPersistentObjectCacherFile.cc -@@ -23,7 +23,7 @@ namespace cache { - - SyncFile::SyncFile(CephContext *cct, const std::string &name) - : cct(cct) { -- m_name = cct->_conf->get_val<std::string>("rbd_shared_cache_path") + "/rbd_cache." + name; -+ m_name = cct->_conf.get_val<std::string>("rbd_shared_cache_path") + "/rbd_cache." + name; - ldout(cct, 20) << "file path=" << m_name << dendl; - } - -diff --git a/src/tools/rbd_cache/CacheController.cc b/src/tools/rbd_cache/CacheController.cc -index e73ba25..cefcf28 100644 ---- a/src/tools/rbd_cache/CacheController.cc -+++ b/src/tools/rbd_cache/CacheController.cc -@@ -20,7 +20,7 @@ public: - : ThreadPool(cct, "librbd::cache::thread_pool", "tp_librbd_cache", 32, - "pcache_threads"), - op_work_queue(new ContextWQ("librbd::pcache_op_work_queue", -- cct->_conf->get_val<int64_t>("rbd_op_thread_timeout"), -+ cct->_conf.get_val<int64_t>("rbd_op_thread_timeout"), - this)) { - start(); - } -diff --git a/src/tools/rbd_cache/main.cc b/src/tools/rbd_cache/main.cc -index a7c5b64..d604760 100644 ---- a/src/tools/rbd_cache/main.cc -+++ b/src/tools/rbd_cache/main.cc -@@ -46,7 +46,7 @@ int main(int argc, const char **argv) - } - } - -- if (g_conf->daemonize) { -+ if (g_conf()->daemonize) { - global_init_daemonize(g_ceph_context); - } - g_ceph_context->enable_perf_counter(); -@@ -62,7 +62,7 @@ int main(int argc, const char **argv) - argv_to_vec(argc, argv, cmd_args); - - // disable unnecessary librbd cache -- g_ceph_context->_conf->set_val_or_die("rbd_cache", "false"); -+ g_ceph_context->_conf.set_val_or_die("rbd_cache", "false"); - - cachectl = new rbd::cache::CacheController(g_ceph_context, cmd_args); - int r = cachectl->init(); --- -2.7.4 - diff --git a/src/ceph/0006-librbd-LRU-policy-based-eviction.patch b/src/ceph/0006-librbd-LRU-policy-based-eviction.patch deleted file mode 100644 index eb99e3a..0000000 --- a/src/ceph/0006-librbd-LRU-policy-based-eviction.patch +++ /dev/null @@ -1,403 +0,0 @@ -From b233d6540160c8bc5cc25b870c2140fa48776fa6 Mon Sep 17 00:00:00 2001 -From: Dehao Shang <dehao.shang@intel.com> -Date: Mon, 6 Aug 2018 22:42:38 +0800 -Subject: [PATCH 06/10] librbd: LRU policy based eviction - -Signed-off-by: Dehao Shang <dehao.shang@intel.com> -Signed-off-by: Yuan Zhou <yuan.zhou@intel.com> ---- - src/tools/rbd_cache/ObjectCacheStore.cc | 73 ++++++++----- - src/tools/rbd_cache/ObjectCacheStore.h | 14 +-- - src/tools/rbd_cache/Policy.hpp | 22 ++++ - src/tools/rbd_cache/SimplePolicy.hpp | 180 ++++++++++++++++++++++++++++++++ - 4 files changed, 254 insertions(+), 35 deletions(-) - create mode 100644 src/tools/rbd_cache/Policy.hpp - create mode 100644 src/tools/rbd_cache/SimplePolicy.hpp - -diff --git a/src/tools/rbd_cache/ObjectCacheStore.cc b/src/tools/rbd_cache/ObjectCacheStore.cc -index 2a87469..b39fe66 100644 ---- a/src/tools/rbd_cache/ObjectCacheStore.cc -+++ b/src/tools/rbd_cache/ObjectCacheStore.cc -@@ -14,12 +14,12 @@ namespace cache { - - ObjectCacheStore::ObjectCacheStore(CephContext *cct, ContextWQ* work_queue) - : m_cct(cct), m_work_queue(work_queue), -- m_cache_table_lock("rbd::cache::ObjectCacheStore"), - m_rados(new librados::Rados()) { -+ m_policy = new SimplePolicy(4096, 0.9); // TODO - } - - ObjectCacheStore::~ObjectCacheStore() { -- -+ delete m_policy; - } - - int ObjectCacheStore::init(bool reset) { -@@ -43,6 +43,7 @@ int ObjectCacheStore::do_promote(std::string pool_name, std::string object_name) - int ret = 0; - std::string cache_file_name = pool_name + object_name; - -+ //TODO(): lock on ioctx map - if (m_ioctxs.find(pool_name) == m_ioctxs.end()) { - librados::IoCtx* io_ctx = new librados::IoCtx(); - ret = m_rados->ioctx_create(pool_name.c_str(), *io_ctx); -@@ -58,10 +59,8 @@ int ObjectCacheStore::do_promote(std::string pool_name, std::string object_name) - librados::IoCtx* ioctx = m_ioctxs[pool_name]; - - //promoting: update metadata -- { -- Mutex::Locker locker(m_cache_table_lock); -- m_cache_table.emplace(cache_file_name, PROMOTING); -- } -+ m_policy->update_status(cache_file_name, PROMOTING); -+ assert(PROMOTING == m_policy->get_status(cache_file_name)); - - librados::bufferlist* read_buf = new librados::bufferlist(); - int object_size = 4096*1024; //TODO(): read config from image metadata -@@ -83,42 +82,60 @@ int ObjectCacheStore::do_promote(std::string pool_name, std::string object_name) - cache_file.open(); - ret = cache_file.write_object_to_file(*read_buf, object_size); - -- assert(m_cache_table.find(cache_file_name) != m_cache_table.end()); -- - // update metadata -- { -- Mutex::Locker locker(m_cache_table_lock); -- m_cache_table.emplace(cache_file_name, PROMOTED); -- } -+ assert(PROMOTING == m_policy->get_status(cache_file_name)); -+ m_policy->update_status(cache_file_name, PROMOTED); -+ assert(PROMOTED == m_policy->get_status(cache_file_name)); - - return ret; - - } - -+// return -1, client need to read data from cluster. -+// return 0, client directly read data from cache. - int ObjectCacheStore::lookup_object(std::string pool_name, std::string object_name) { - - std::string cache_file_name = pool_name + object_name; -- { -- Mutex::Locker locker(m_cache_table_lock); -- -- auto it = m_cache_table.find(cache_file_name); -- if (it != m_cache_table.end()) { -- -- if (it->second == PROMOTING) { -- return -1; -- } else if (it->second == PROMOTED) { -- return 0; -- } else { -- assert(0); -- } -- } -+ -+ // TODO lookup and return status; -+ -+ CACHESTATUS ret; -+ ret = m_policy->lookup_object(cache_file_name); -+ -+ switch(ret) { -+ case NONE: -+ return do_promote(pool_name, object_name); -+ case PROMOTING: -+ return -1; -+ case PROMOTED: -+ return 0; -+ default: -+ return -1; - } -+} - -- int ret = do_promote(pool_name, object_name); -+void ObjectCacheStore::evict_thread_body() { -+ int ret; -+ while(m_evict_go) { -+ std::string temp_cache_file; - -- return ret; -+ ret = m_policy->evict_object(temp_cache_file); -+ if(ret == 0) { -+ continue; -+ } -+ -+ // TODO -+ // delete temp_cache_file file. -+ -+ assert(EVICTING == m_policy->get_status(temp_cache_file)); -+ -+ m_policy->update_status(temp_cache_file, EVICTED); -+ -+ assert(NONE == m_policy->get_status(temp_cache_file)); -+ } - } - -+ - int ObjectCacheStore::shutdown() { - m_rados->shutdown(); - return 0; -diff --git a/src/tools/rbd_cache/ObjectCacheStore.h b/src/tools/rbd_cache/ObjectCacheStore.h -index db09efa..5118a73 100644 ---- a/src/tools/rbd_cache/ObjectCacheStore.h -+++ b/src/tools/rbd_cache/ObjectCacheStore.h -@@ -13,6 +13,7 @@ - #include "librbd/ImageCtx.h" - #include "librbd/ImageState.h" - #include "librbd/cache/SharedPersistentObjectCacherFile.h" -+#include "SimplePolicy.hpp" - - using librados::Rados; - using librados::IoCtx; -@@ -39,6 +40,8 @@ class ObjectCacheStore - - int lock_cache(std::string vol_name); - -+ void evict_thread_body(); -+ - private: - int _evict_object(); - -@@ -48,21 +51,18 @@ class ObjectCacheStore - librados::bufferlist* read_buf, - uint64_t length); - -- enum { -- PROMOTING = 0, -- PROMOTED, -- }; -- - CephContext *m_cct; - ContextWQ* m_work_queue; -- Mutex m_cache_table_lock; - RadosRef m_rados; - -- std::map<std::string, uint8_t> m_cache_table; - - std::map<std::string, librados::IoCtx*> m_ioctxs; - - librbd::cache::SyncFile *m_cache_file; -+ -+ Policy* m_policy; -+ -+ bool m_evict_go; - }; - - } // namespace rbd -diff --git a/src/tools/rbd_cache/Policy.hpp b/src/tools/rbd_cache/Policy.hpp -new file mode 100644 -index 0000000..575c294 ---- /dev/null -+++ b/src/tools/rbd_cache/Policy.hpp -@@ -0,0 +1,22 @@ -+#ifndef RBD_CACHE_POLICY_HPP -+#define RBD_CACHE_POLICY_HPP -+ -+enum CACHESTATUS { -+ NONE = 0, -+ PROMOTING, -+ PROMOTED, -+ EVICTING, -+ EVICTED, -+}; -+ -+ -+class Policy { -+public: -+ Policy(){} -+ virtual ~Policy(){}; -+ virtual CACHESTATUS lookup_object(std::string) = 0; -+ virtual int evict_object(std::string&) = 0; -+ virtual void update_status(std::string, CACHESTATUS) = 0; -+ virtual CACHESTATUS get_status(std::string) = 0; -+}; -+#endif -diff --git a/src/tools/rbd_cache/SimplePolicy.hpp b/src/tools/rbd_cache/SimplePolicy.hpp -new file mode 100644 -index 0000000..a0d8de7 ---- /dev/null -+++ b/src/tools/rbd_cache/SimplePolicy.hpp -@@ -0,0 +1,180 @@ -+#ifndef RBD_CACHE_SIMPLE_POLICY_HPP -+#define RBD_CACHE_SIMPLE_POLICY_HPP -+ -+#include "Policy.hpp" -+#include "include/lru.h" -+#include "common/Mutex.h" -+ -+#include <vector> -+#include <unordered_map> -+#include <string> -+ -+class SimplePolicy : public Policy { -+public: -+ SimplePolicy(uint64_t block_num, float level) -+ : m_level(level), -+ m_lock("SimplePolicy"), -+ m_entry_count(block_num) -+ { -+ -+ Entry m_entries[m_entry_count]; -+ -+ for(auto &entry : m_entries) { -+ m_free_lru.lru_insert_bot(&entry); -+ } -+ } -+ -+ ~SimplePolicy() {} -+ -+ CACHESTATUS lookup_object(std::string cache_file_name) { -+ Mutex::Locker locker(m_lock); -+ -+ auto entry_it = m_oid_to_entry.find(cache_file_name); -+ if(entry_it == m_oid_to_entry.end()) { -+ return NONE; -+ } -+ -+ Entry* entry = entry_it->second; -+ -+ LRU* lru; -+ if(entry->status == PROMOTED) { -+ lru = &m_promoted_lru; -+ } else { -+ lru = &m_handing_lru; -+ } -+ -+ // touch it -+ lru->lru_remove(entry); -+ lru->lru_insert_top(entry); -+ -+ return entry->status; -+ } -+ -+ int evict_object(std::string& out_cache_file_name) { -+ Mutex::Locker locker(m_lock); -+ -+ // still have enough free space, don't need to evict lru. -+ uint64_t temp_current_size = m_oid_to_entry.size(); -+ float temp_current_evict_level = temp_current_size / m_entry_count; -+ if(temp_current_evict_level < m_level) { -+ return 0; -+ } -+ -+ // when all entries are USING, PROMOTING or EVICTING, just busy waiting. -+ if(m_promoted_lru.lru_get_size() == 0) { -+ return 0; -+ } -+ -+ assert(m_promoted_lru.lru_get_size() != 0); -+ -+ // evict one item from promoted lru -+ Entry *entry = reinterpret_cast<Entry*>(m_promoted_lru.lru_get_next_expire()); -+ assert(entry != nullptr); -+ -+ assert(entry->status == PROMOTED); -+ -+ out_cache_file_name = entry->cache_file_name; -+ entry->status = EVICTING; -+ -+ m_promoted_lru.lru_remove(entry); -+ m_handing_lru.lru_insert_top(entry); -+ -+ return 1; -+ } -+ -+ // TODO(): simplify the logic -+ void update_status(std::string _file_name, CACHESTATUS _status) { -+ Mutex::Locker locker(m_lock); -+ -+ Entry* entry; -+ auto entry_it = m_oid_to_entry.find(_file_name); -+ -+ // just check. -+ if(_status == PROMOTING) { -+ assert(m_oid_to_entry.find(_file_name) == m_oid_to_entry.end()); -+ } -+ -+ // miss this object. -+ if(entry_it == m_oid_to_entry.end() && _status == PROMOTING) { -+ entry = reinterpret_cast<Entry*>(m_free_lru.lru_get_next_expire()); -+ if(entry == nullptr) { -+ assert(0); // namely evict thread have some problems. -+ } -+ -+ entry->status = PROMOTING; -+ -+ m_oid_to_entry[_file_name] = entry; -+ m_free_lru.lru_remove(entry); -+ m_handing_lru.lru_insert_top(entry); -+ -+ return; -+ } -+ -+ assert(entry_it != m_oid_to_entry.end()); -+ -+ entry = entry_it->second; -+ -+ // promoting action have been finished, so update it. -+ if(entry->status == PROMOTING && _status== PROMOTED) { -+ m_handing_lru.lru_remove(entry); -+ m_promoted_lru.lru_insert_top(entry); -+ entry->status = PROMOTED; -+ return; -+ } -+ -+ // will delete this cache file -+ if(entry->status == PROMOTED && _status == EVICTING) { -+ m_promoted_lru.lru_remove(entry); -+ m_handing_lru.lru_insert_top(entry); -+ entry->status = EVICTING; -+ return; -+ } -+ -+ -+ if(_status == EVICTED) { -+ m_oid_to_entry.erase(entry_it); -+ m_handing_lru.lru_remove(entry); -+ m_free_lru.lru_insert_bot(entry); -+ return; -+ } -+ -+ assert(0); -+ } -+ -+ // get entry status -+ CACHESTATUS get_status(std::string _file_name) { -+ Mutex::Locker locker(m_lock); -+ auto entry_it = m_oid_to_entry.find(_file_name); -+ if(entry_it == m_oid_to_entry.end()) { -+ return NONE; -+ } -+ -+ return entry_it->second->status; -+ } -+ -+ -+private: -+ -+ class Entry : public LRUObject { -+ public: -+ CACHESTATUS status; -+ Entry() : status(NONE){} -+ std::string cache_file_name; -+ void encode(bufferlist &bl){} -+ void decode(bufferlist::iterator &it){} -+ }; -+ -+ std::unordered_map<std::string, Entry*> m_oid_to_entry; -+ -+ LRU m_free_lru; -+ LRU m_handing_lru; // include promoting status or evicting status -+ LRU m_promoted_lru; // include promoted, using status. -+ -+ mutable Mutex m_lock; -+ -+ float m_level; -+ uint64_t m_entry_count; -+ -+}; -+ -+#endif --- -2.7.4 - diff --git a/src/ceph/0007-librbd-cleanup-policy-based-promotion-eviction.patch b/src/ceph/0007-librbd-cleanup-policy-based-promotion-eviction.patch deleted file mode 100644 index 010407b..0000000 --- a/src/ceph/0007-librbd-cleanup-policy-based-promotion-eviction.patch +++ /dev/null @@ -1,512 +0,0 @@ -From dd4804fb05ad8aca51516b0112975cc91ef85a6b Mon Sep 17 00:00:00 2001 -From: Yuan Zhou <yuan.zhou@intel.com> -Date: Wed, 8 Aug 2018 15:31:47 +0800 -Subject: [PATCH 07/10] librbd: cleanup policy based promotion/eviction - -Signed-off-by: Yuan Zhou <yuan.zhou@intel.com> ---- - src/common/options.cc | 4 + - .../rbd_cache/CacheControllerSocketClient.hpp | 3 +- - src/tools/rbd_cache/ObjectCacheStore.cc | 63 +++---- - src/tools/rbd_cache/ObjectCacheStore.h | 10 +- - src/tools/rbd_cache/Policy.hpp | 18 +- - src/tools/rbd_cache/SimplePolicy.hpp | 188 +++++++++------------ - 6 files changed, 141 insertions(+), 145 deletions(-) - -diff --git a/src/common/options.cc b/src/common/options.cc -index 7839a31..b334c1e 100644 ---- a/src/common/options.cc -+++ b/src/common/options.cc -@@ -6365,6 +6365,10 @@ static std::vector<Option> get_rbd_options() { - .set_default("/tmp") - .set_description("shared ssd caching data dir"), - -+ Option("rbd_shared_cache_entries", Option::TYPE_INT, Option::LEVEL_ADVANCED) -+ .set_default(4096) -+ .set_description("shared ssd caching data entries"), -+ - Option("rbd_non_blocking_aio", Option::TYPE_BOOL, Option::LEVEL_ADVANCED) - .set_default(true) - .set_description("process AIO ops from a dispatch thread to prevent blocking"), -diff --git a/src/tools/rbd_cache/CacheControllerSocketClient.hpp b/src/tools/rbd_cache/CacheControllerSocketClient.hpp -index 4e1f36c..56b79ce 100644 ---- a/src/tools/rbd_cache/CacheControllerSocketClient.hpp -+++ b/src/tools/rbd_cache/CacheControllerSocketClient.hpp -@@ -90,7 +90,8 @@ public: - } - }); - std::unique_lock<std::mutex> lk(m); -- cv.wait(lk); -+ //cv.wait(lk); -+ cv.wait_for(lk, std::chrono::milliseconds(100)); - return 0; - } - -diff --git a/src/tools/rbd_cache/ObjectCacheStore.cc b/src/tools/rbd_cache/ObjectCacheStore.cc -index b39fe66..99f90d6 100644 ---- a/src/tools/rbd_cache/ObjectCacheStore.cc -+++ b/src/tools/rbd_cache/ObjectCacheStore.cc -@@ -15,7 +15,12 @@ namespace cache { - ObjectCacheStore::ObjectCacheStore(CephContext *cct, ContextWQ* work_queue) - : m_cct(cct), m_work_queue(work_queue), - m_rados(new librados::Rados()) { -- m_policy = new SimplePolicy(4096, 0.9); // TODO -+ -+ uint64_t object_cache_entries = -+ cct->_conf.get_val<int64_t>("rbd_shared_cache_entries"); -+ -+ //TODO(): allow to set level -+ m_policy = new SimplePolicy(object_cache_entries, 0.5); - } - - ObjectCacheStore::~ObjectCacheStore() { -@@ -35,7 +40,16 @@ int ObjectCacheStore::init(bool reset) { - lderr(m_cct) << "fail to conect to cluster" << dendl; - return ret; - } -- //TODO(): check existing cache objects -+ -+ std::string cache_path = m_cct->_conf.get_val<std::string>("rbd_shared_cache_path"); -+ //TODO(): check and reuse existing cache objects -+ if(reset) { -+ std::string cmd = "exec rm -rf " + cache_path + "/rbd_cache*; exec mkdir -p " + cache_path; -+ //TODO(): to use std::filesystem -+ int r = system(cmd.c_str()); -+ } -+ -+ evict_thd = new std::thread([this]{this->evict_thread_body();}); - return ret; - } - -@@ -58,10 +72,6 @@ int ObjectCacheStore::do_promote(std::string pool_name, std::string object_name) - - librados::IoCtx* ioctx = m_ioctxs[pool_name]; - -- //promoting: update metadata -- m_policy->update_status(cache_file_name, PROMOTING); -- assert(PROMOTING == m_policy->get_status(cache_file_name)); -- - librados::bufferlist* read_buf = new librados::bufferlist(); - int object_size = 4096*1024; //TODO(): read config from image metadata - -@@ -83,9 +93,9 @@ int ObjectCacheStore::do_promote(std::string pool_name, std::string object_name) - ret = cache_file.write_object_to_file(*read_buf, object_size); - - // update metadata -- assert(PROMOTING == m_policy->get_status(cache_file_name)); -- m_policy->update_status(cache_file_name, PROMOTED); -- assert(PROMOTED == m_policy->get_status(cache_file_name)); -+ assert(OBJ_CACHE_PROMOTING == m_policy->get_status(cache_file_name)); -+ m_policy->update_status(cache_file_name, OBJ_CACHE_PROMOTED); -+ assert(OBJ_CACHE_PROMOTED == m_policy->get_status(cache_file_name)); - - return ret; - -@@ -97,18 +107,15 @@ int ObjectCacheStore::lookup_object(std::string pool_name, std::string object_na - - std::string cache_file_name = pool_name + object_name; - -- // TODO lookup and return status; -- - CACHESTATUS ret; - ret = m_policy->lookup_object(cache_file_name); - - switch(ret) { -- case NONE: -+ case OBJ_CACHE_NONE: - return do_promote(pool_name, object_name); -- case PROMOTING: -- return -1; -- case PROMOTED: -+ case OBJ_CACHE_PROMOTED: - return 0; -+ case OBJ_CACHE_PROMOTING: - default: - return -1; - } -@@ -117,26 +124,14 @@ int ObjectCacheStore::lookup_object(std::string pool_name, std::string object_na - void ObjectCacheStore::evict_thread_body() { - int ret; - while(m_evict_go) { -- std::string temp_cache_file; -- -- ret = m_policy->evict_object(temp_cache_file); -- if(ret == 0) { -- continue; -- } -- -- // TODO -- // delete temp_cache_file file. -- -- assert(EVICTING == m_policy->get_status(temp_cache_file)); -- -- m_policy->update_status(temp_cache_file, EVICTED); -- -- assert(NONE == m_policy->get_status(temp_cache_file)); -+ ret = evict_objects(); - } - } - - - int ObjectCacheStore::shutdown() { -+ m_evict_go = false; -+ evict_thd->join(); - m_rados->shutdown(); - return 0; - } -@@ -165,5 +160,13 @@ int ObjectCacheStore::promote_object(librados::IoCtx* ioctx, std::string object_ - - } - -+int ObjectCacheStore::evict_objects() { -+ std::list<std::string> obj_list; -+ m_policy->get_evict_list(&obj_list); -+ for (auto& obj: obj_list) { -+ //do_evict(obj); -+ } -+} -+ - } // namespace cache - } // namespace rbd -diff --git a/src/tools/rbd_cache/ObjectCacheStore.h b/src/tools/rbd_cache/ObjectCacheStore.h -index 5118a73..ba0e1f1 100644 ---- a/src/tools/rbd_cache/ObjectCacheStore.h -+++ b/src/tools/rbd_cache/ObjectCacheStore.h -@@ -15,6 +15,7 @@ - #include "librbd/cache/SharedPersistentObjectCacherFile.h" - #include "SimplePolicy.hpp" - -+ - using librados::Rados; - using librados::IoCtx; - -@@ -40,10 +41,9 @@ class ObjectCacheStore - - int lock_cache(std::string vol_name); - -- void evict_thread_body(); -- - private: -- int _evict_object(); -+ void evict_thread_body(); -+ int evict_objects(); - - int do_promote(std::string pool_name, std::string object_name); - -@@ -61,8 +61,8 @@ class ObjectCacheStore - librbd::cache::SyncFile *m_cache_file; - - Policy* m_policy; -- -- bool m_evict_go; -+ std::thread* evict_thd; -+ bool m_evict_go = false; - }; - - } // namespace rbd -diff --git a/src/tools/rbd_cache/Policy.hpp b/src/tools/rbd_cache/Policy.hpp -index 575c294..711e3bd 100644 ---- a/src/tools/rbd_cache/Policy.hpp -+++ b/src/tools/rbd_cache/Policy.hpp -@@ -1,12 +1,16 @@ - #ifndef RBD_CACHE_POLICY_HPP - #define RBD_CACHE_POLICY_HPP - -+#include <list> -+#include <string> -+ -+namespace rbd { -+namespace cache { -+ - enum CACHESTATUS { -- NONE = 0, -- PROMOTING, -- PROMOTED, -- EVICTING, -- EVICTED, -+ OBJ_CACHE_NONE = 0, -+ OBJ_CACHE_PROMOTING, -+ OBJ_CACHE_PROMOTED, - }; - - -@@ -18,5 +22,9 @@ public: - virtual int evict_object(std::string&) = 0; - virtual void update_status(std::string, CACHESTATUS) = 0; - virtual CACHESTATUS get_status(std::string) = 0; -+ virtual void get_evict_list(std::list<std::string>* obj_list) = 0; - }; -+ -+} // namespace cache -+} // namespace rbd - #endif -diff --git a/src/tools/rbd_cache/SimplePolicy.hpp b/src/tools/rbd_cache/SimplePolicy.hpp -index a0d8de7..e785de1 100644 ---- a/src/tools/rbd_cache/SimplePolicy.hpp -+++ b/src/tools/rbd_cache/SimplePolicy.hpp -@@ -3,138 +3,93 @@ - - #include "Policy.hpp" - #include "include/lru.h" -+#include "common/RWLock.h" - #include "common/Mutex.h" - - #include <vector> - #include <unordered_map> - #include <string> - -+namespace rbd { -+namespace cache { -+ -+ - class SimplePolicy : public Policy { - public: -- SimplePolicy(uint64_t block_num, float level) -- : m_level(level), -- m_lock("SimplePolicy"), -- m_entry_count(block_num) -+ SimplePolicy(uint64_t block_num, float watermark) -+ : m_watermark(watermark), m_entry_count(block_num), -+ m_cache_map_lock("rbd::cache::SimplePolicy::m_cache_map_lock"), -+ m_free_list_lock("rbd::cache::SimplePolicy::m_free_list_lock") - { - -- Entry m_entries[m_entry_count]; -- -- for(auto &entry : m_entries) { -- m_free_lru.lru_insert_bot(&entry); -+ for(uint64_t i = 0; i < m_entry_count; i++) { -+ m_free_list.push_back(new Entry()); - } -+ - } - -- ~SimplePolicy() {} -+ ~SimplePolicy() { -+ for(uint64_t i = 0; i < m_entry_count; i++) { -+ Entry* entry = reinterpret_cast<Entry*>(m_free_list.front()); -+ delete entry; -+ m_free_list.pop_front(); -+ } -+ } - - CACHESTATUS lookup_object(std::string cache_file_name) { -- Mutex::Locker locker(m_lock); - -- auto entry_it = m_oid_to_entry.find(cache_file_name); -- if(entry_it == m_oid_to_entry.end()) { -- return NONE; -+ //TODO(): check race condition -+ RWLock::WLocker wlocker(m_cache_map_lock); -+ -+ auto entry_it = m_cache_map.find(cache_file_name); -+ if(entry_it == m_cache_map.end()) { -+ Mutex::Locker locker(m_free_list_lock); -+ Entry* entry = reinterpret_cast<Entry*>(m_free_list.front()); -+ assert(entry != nullptr); -+ m_free_list.pop_front(); -+ entry->status = OBJ_CACHE_PROMOTING; -+ -+ m_cache_map[cache_file_name] = entry; -+ -+ return OBJ_CACHE_NONE; - } - - Entry* entry = entry_it->second; - -- LRU* lru; -- if(entry->status == PROMOTED) { -- lru = &m_promoted_lru; -- } else { -- lru = &m_handing_lru; -+ if(entry->status == OBJ_CACHE_PROMOTED) { -+ // touch it -+ m_promoted_lru.lru_touch(entry); - } - -- // touch it -- lru->lru_remove(entry); -- lru->lru_insert_top(entry); -- - return entry->status; - } - - int evict_object(std::string& out_cache_file_name) { -- Mutex::Locker locker(m_lock); -- -- // still have enough free space, don't need to evict lru. -- uint64_t temp_current_size = m_oid_to_entry.size(); -- float temp_current_evict_level = temp_current_size / m_entry_count; -- if(temp_current_evict_level < m_level) { -- return 0; -- } -- -- // when all entries are USING, PROMOTING or EVICTING, just busy waiting. -- if(m_promoted_lru.lru_get_size() == 0) { -- return 0; -- } -- -- assert(m_promoted_lru.lru_get_size() != 0); -- -- // evict one item from promoted lru -- Entry *entry = reinterpret_cast<Entry*>(m_promoted_lru.lru_get_next_expire()); -- assert(entry != nullptr); -- -- assert(entry->status == PROMOTED); -- -- out_cache_file_name = entry->cache_file_name; -- entry->status = EVICTING; -- -- m_promoted_lru.lru_remove(entry); -- m_handing_lru.lru_insert_top(entry); -+ RWLock::WLocker locker(m_cache_map_lock); - - return 1; - } - - // TODO(): simplify the logic -- void update_status(std::string _file_name, CACHESTATUS _status) { -- Mutex::Locker locker(m_lock); -+ void update_status(std::string file_name, CACHESTATUS new_status) { -+ RWLock::WLocker locker(m_cache_map_lock); - - Entry* entry; -- auto entry_it = m_oid_to_entry.find(_file_name); -+ auto entry_it = m_cache_map.find(file_name); - - // just check. -- if(_status == PROMOTING) { -- assert(m_oid_to_entry.find(_file_name) == m_oid_to_entry.end()); -+ if(new_status == OBJ_CACHE_PROMOTING) { -+ assert(entry_it == m_cache_map.end()); - } - -- // miss this object. -- if(entry_it == m_oid_to_entry.end() && _status == PROMOTING) { -- entry = reinterpret_cast<Entry*>(m_free_lru.lru_get_next_expire()); -- if(entry == nullptr) { -- assert(0); // namely evict thread have some problems. -- } -- -- entry->status = PROMOTING; -- -- m_oid_to_entry[_file_name] = entry; -- m_free_lru.lru_remove(entry); -- m_handing_lru.lru_insert_top(entry); -- -- return; -- } -- -- assert(entry_it != m_oid_to_entry.end()); -+ assert(entry_it != m_cache_map.end()); - - entry = entry_it->second; - -- // promoting action have been finished, so update it. -- if(entry->status == PROMOTING && _status== PROMOTED) { -- m_handing_lru.lru_remove(entry); -+ // promoting is done, so update it. -+ if(entry->status == OBJ_CACHE_PROMOTING && new_status== OBJ_CACHE_PROMOTED) { - m_promoted_lru.lru_insert_top(entry); -- entry->status = PROMOTED; -- return; -- } -- -- // will delete this cache file -- if(entry->status == PROMOTED && _status == EVICTING) { -- m_promoted_lru.lru_remove(entry); -- m_handing_lru.lru_insert_top(entry); -- entry->status = EVICTING; -- return; -- } -- -- -- if(_status == EVICTED) { -- m_oid_to_entry.erase(entry_it); -- m_handing_lru.lru_remove(entry); -- m_free_lru.lru_insert_bot(entry); -+ entry->status = new_status; - return; - } - -@@ -142,39 +97,64 @@ public: - } - - // get entry status -- CACHESTATUS get_status(std::string _file_name) { -- Mutex::Locker locker(m_lock); -- auto entry_it = m_oid_to_entry.find(_file_name); -- if(entry_it == m_oid_to_entry.end()) { -- return NONE; -+ CACHESTATUS get_status(std::string file_name) { -+ RWLock::RLocker locker(m_cache_map_lock); -+ auto entry_it = m_cache_map.find(file_name); -+ if(entry_it == m_cache_map.end()) { -+ return OBJ_CACHE_NONE; - } - - return entry_it->second->status; - } - -+ void get_evict_list(std::list<std::string>* obj_list) { -+ RWLock::WLocker locker(m_cache_map_lock); -+ // check free ratio, pop entries from LRU -+ if (m_free_list.size() / m_entry_count < m_watermark) { -+ int evict_num = 10; //TODO(): make this configurable -+ for(int i = 0; i < evict_num; i++) { -+ Entry* entry = reinterpret_cast<Entry*>(m_promoted_lru.lru_expire()); -+ if (entry == nullptr) { -+ continue; -+ } -+ std::string file_name = entry->cache_file_name; -+ obj_list->push_back(file_name); -+ -+ auto entry_it = m_cache_map.find(file_name); -+ m_cache_map.erase(entry_it); -+ -+ //mark this entry as free -+ entry->status = OBJ_CACHE_NONE; -+ Mutex::Locker locker(m_free_list_lock); -+ m_free_list.push_back(entry); -+ } -+ } -+ } - - private: - - class Entry : public LRUObject { - public: - CACHESTATUS status; -- Entry() : status(NONE){} -+ Entry() : status(OBJ_CACHE_NONE){} - std::string cache_file_name; - void encode(bufferlist &bl){} - void decode(bufferlist::iterator &it){} - }; - -- std::unordered_map<std::string, Entry*> m_oid_to_entry; -+ float m_watermark; -+ uint64_t m_entry_count; - -- LRU m_free_lru; -- LRU m_handing_lru; // include promoting status or evicting status -- LRU m_promoted_lru; // include promoted, using status. -+ std::unordered_map<std::string, Entry*> m_cache_map; -+ RWLock m_cache_map_lock; - -- mutable Mutex m_lock; -+ std::deque<Entry*> m_free_list; -+ Mutex m_free_list_lock; - -- float m_level; -- uint64_t m_entry_count; -+ LRU m_promoted_lru; // include promoted, using status. - - }; - -+} // namespace cache -+} // namespace rbd - #endif --- -2.7.4 - diff --git a/src/ceph/0008-librbd-implement-async-cache-lookup-and-read.patch b/src/ceph/0008-librbd-implement-async-cache-lookup-and-read.patch deleted file mode 100644 index 16dd41f..0000000 --- a/src/ceph/0008-librbd-implement-async-cache-lookup-and-read.patch +++ /dev/null @@ -1,366 +0,0 @@ -From 9f8ff821dfc98dfc3cdb557b736ce455a3ae6162 Mon Sep 17 00:00:00 2001 -From: Yuan Zhou <yuan.zhou@intel.com> -Date: Thu, 16 Aug 2018 17:28:46 +0800 -Subject: [PATCH 08/10] librbd: implement async cache lookup and read - -Signed-off-by: Yuan Zhou <yuan.zhou@intel.com> ---- - .../SharedPersistentObjectCacherObjectDispatch.cc | 63 ++++++++++++---------- - .../SharedPersistentObjectCacherObjectDispatch.h | 7 +++ - src/tools/rbd_cache/CacheController.cc | 9 ++-- - src/tools/rbd_cache/CacheControllerSocket.hpp | 8 ++- - .../rbd_cache/CacheControllerSocketClient.hpp | 49 +++++++++-------- - src/tools/rbd_cache/CacheControllerSocketCommon.h | 12 +++++ - 6 files changed, 94 insertions(+), 54 deletions(-) - -diff --git a/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc b/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc -index 2aa5cad..407ce49 100644 ---- a/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc -+++ b/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc -@@ -29,13 +29,8 @@ SharedPersistentObjectCacherObjectDispatch<I>::SharedPersistentObjectCacherObjec - - template <typename I> - SharedPersistentObjectCacherObjectDispatch<I>::~SharedPersistentObjectCacherObjectDispatch() { -- if (m_object_store) { - delete m_object_store; -- } -- -- if (m_cache_client) { - delete m_cache_client; -- } - } - - template <typename I> -@@ -88,34 +83,48 @@ bool SharedPersistentObjectCacherObjectDispatch<I>::read( - ldout(cct, 20) << "object_no=" << object_no << " " << object_off << "~" - << object_len << dendl; - -- // ensure we aren't holding the cache lock post-read - on_dispatched = util::create_async_context_callback(*m_image_ctx, - on_dispatched); -+ auto ctx = new FunctionContext([this, oid, object_off, object_len, read_data, dispatch_result, on_dispatched](bool cache) { -+ handle_read_cache(cache, oid, object_off, object_len, read_data, dispatch_result, on_dispatched); -+ }); - - if (m_cache_client && m_cache_client->connected && m_object_store) { -- bool exists; - m_cache_client->lookup_object(m_image_ctx->data_ctx.get_pool_name(), -- m_image_ctx->id, oid, &exists); -- -- // try to read from parent image -- ldout(cct, 20) << "SRO cache object exists:" << exists << dendl; -- if (exists) { -- int r = m_object_store->read_object(oid, read_data, object_off, object_len, on_dispatched); -- if (r != 0) { -- *dispatch_result = io::DISPATCH_RESULT_COMPLETE; -- on_dispatched->complete(r); -- return true; -- } -- } -+ m_image_ctx->id, oid, ctx); - } -- -- ldout(cct, 20) << "Continue read from RADOS" << dendl; -- *dispatch_result = io::DISPATCH_RESULT_CONTINUE; -- on_dispatched->complete(0); - return true; - } - - template <typename I> -+int SharedPersistentObjectCacherObjectDispatch<I>::handle_read_cache( -+ bool cache, -+ const std::string &oid, uint64_t object_off, uint64_t object_len, -+ ceph::bufferlist* read_data, io::DispatchResult* dispatch_result, -+ Context* on_dispatched) { -+ // IO chained in reverse order -+ auto cct = m_image_ctx->cct; -+ ldout(cct, 20) << dendl; -+ -+ // try to read from parent image -+ if (cache) { -+ int r = m_object_store->read_object(oid, read_data, object_off, object_len, on_dispatched); -+ if (r != 0) { -+ *dispatch_result = io::DISPATCH_RESULT_COMPLETE; -+ //TODO(): complete in syncfile -+ on_dispatched->complete(r); -+ ldout(cct, 20) << "AAAAcomplete=" << *dispatch_result <<dendl; -+ return true; -+ } -+ } else { -+ *dispatch_result = io::DISPATCH_RESULT_CONTINUE; -+ on_dispatched->complete(0); -+ ldout(cct, 20) << "BBB no cache" << *dispatch_result <<dendl; -+ return false; -+ } -+} -+ -+template <typename I> - void SharedPersistentObjectCacherObjectDispatch<I>::client_handle_request(std::string msg) { - auto cct = m_image_ctx->cct; - ldout(cct, 20) << dendl; -@@ -123,26 +132,26 @@ void SharedPersistentObjectCacherObjectDispatch<I>::client_handle_request(std::s - rbd::cache::rbdsc_req_type_t *io_ctx = (rbd::cache::rbdsc_req_type_t*)(msg.c_str()); - - switch (io_ctx->type) { -- case RBDSC_REGISTER_REPLY: { -+ case rbd::cache::RBDSC_REGISTER_REPLY: { - // open cache handler for volume - ldout(cct, 20) << "SRO cache client open cache handler" << dendl; - m_object_store = new SharedPersistentObjectCacher<I>(m_image_ctx, m_image_ctx->shared_cache_path); - - break; - } -- case RBDSC_READ_REPLY: { -+ case rbd::cache::RBDSC_READ_REPLY: { - ldout(cct, 20) << "SRO cache client start to read cache" << dendl; - //TODO(): should call read here - - break; - } -- case RBDSC_READ_RADOS: { -+ case rbd::cache::RBDSC_READ_RADOS: { - ldout(cct, 20) << "SRO cache client start to read rados" << dendl; - //TODO(): should call read here - - break; - } -- default: ldout(cct, 20) << "nothing" << dendl; -+ default: ldout(cct, 20) << "nothing" << io_ctx->type <<dendl; - break; - - } -diff --git a/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.h b/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.h -index 200688f..36b868a 100644 ---- a/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.h -+++ b/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.h -@@ -112,6 +112,13 @@ public: - - private: - -+ int handle_read_cache( -+ bool cache, -+ const std::string &oid, uint64_t object_off, -+ uint64_t object_len, ceph::bufferlist* read_data, -+ io::DispatchResult* dispatch_result, -+ Context* on_dispatched); -+ - ImageCtxT* m_image_ctx; - - void client_handle_request(std::string msg); -diff --git a/src/tools/rbd_cache/CacheController.cc b/src/tools/rbd_cache/CacheController.cc -index cefcf28..c9d674b 100644 ---- a/src/tools/rbd_cache/CacheController.cc -+++ b/src/tools/rbd_cache/CacheController.cc -@@ -76,7 +76,7 @@ void CacheController::run() { - } - } - --void CacheController::handle_request(uint64_t sesstion_id, std::string msg){ -+void CacheController::handle_request(uint64_t session_id, std::string msg){ - rbdsc_req_type_t *io_ctx = (rbdsc_req_type_t*)(msg.c_str()); - - int ret = 0; -@@ -86,7 +86,7 @@ void CacheController::handle_request(uint64_t sesstion_id, std::string msg){ - // init cache layout for volume - m_object_cache_store->init_cache(io_ctx->vol_name, io_ctx->vol_size); - io_ctx->type = RBDSC_REGISTER_REPLY; -- m_cache_server->send(sesstion_id, std::string((char*)io_ctx, msg.size())); -+ m_cache_server->send(session_id, std::string((char*)io_ctx, msg.size())); - - break; - } -@@ -98,7 +98,10 @@ void CacheController::handle_request(uint64_t sesstion_id, std::string msg){ - } else { - io_ctx->type = RBDSC_READ_REPLY; - } -- m_cache_server->send(sesstion_id, std::string((char*)io_ctx, msg.size())); -+ if (io_ctx->type != RBDSC_READ_REPLY) { -+ assert(0); -+ } -+ m_cache_server->send(session_id, std::string((char*)io_ctx, msg.size())); - - break; - } -diff --git a/src/tools/rbd_cache/CacheControllerSocket.hpp b/src/tools/rbd_cache/CacheControllerSocket.hpp -index 967af1d..d178b58 100644 ---- a/src/tools/rbd_cache/CacheControllerSocket.hpp -+++ b/src/tools/rbd_cache/CacheControllerSocket.hpp -@@ -43,7 +43,9 @@ public: - void handle_read(const boost::system::error_code& error, size_t bytes_transferred) { - - if (!error) { -- -+ if(bytes_transferred != 544){ -+ assert(0); -+ } - process_msg(session_id, std::string(data_, bytes_transferred)); - - } -@@ -51,7 +53,8 @@ public: - - void handle_write(const boost::system::error_code& error) { - if (!error) { -- socket_.async_read_some(boost::asio::buffer(data_), -+ boost::asio::async_read(socket_, boost::asio::buffer(data_), -+ boost::asio::transfer_exactly(544), - boost::bind(&session::handle_read, - shared_from_this(), - boost::asio::placeholders::error, -@@ -63,6 +66,7 @@ public: - - boost::asio::async_write(socket_, - boost::asio::buffer(msg.c_str(), msg.size()), -+ boost::asio::transfer_exactly(544), - boost::bind(&session::handle_write, - shared_from_this(), - boost::asio::placeholders::error)); -diff --git a/src/tools/rbd_cache/CacheControllerSocketClient.hpp b/src/tools/rbd_cache/CacheControllerSocketClient.hpp -index 56b79ce..3b0ca00 100644 ---- a/src/tools/rbd_cache/CacheControllerSocketClient.hpp -+++ b/src/tools/rbd_cache/CacheControllerSocketClient.hpp -@@ -8,6 +8,7 @@ - #include <boost/bind.hpp> - #include <boost/algorithm/string.hpp> - #include "include/assert.h" -+#include "include/Context.h" - #include "CacheControllerSocketCommon.h" - - -@@ -26,8 +27,12 @@ public: - m_client_process_msg(processmsg), - ep_(stream_protocol::endpoint(file)) - { -- std::thread thd([this](){io_service_.run(); }); -- thd.detach(); -+ io_thread.reset(new std::thread([this](){io_service_.run(); })); -+ } -+ -+ ~CacheClient() { -+ io_service_.stop(); -+ io_thread->join(); - } - - void run(){ -@@ -53,7 +58,8 @@ public: - message->offset = 0; - message->length = 0; - boost::asio::async_write(socket_, boost::asio::buffer((char*)message, message->size()), -- [this](const boost::system::error_code& err, size_t cb) { -+ [this, message](const boost::system::error_code& err, size_t cb) { -+ delete message; - if (!err) { - boost::asio::async_read(socket_, boost::asio::buffer(buffer_), - boost::asio::transfer_exactly(544), -@@ -72,7 +78,7 @@ public: - return 0; - } - -- int lookup_object(std::string pool_name, std::string vol_name, std::string object_id, bool* result) { -+ int lookup_object(std::string pool_name, std::string vol_name, std::string object_id, Context* on_finish) { - rbdsc_req_type_t *message = new rbdsc_req_type_t(); - message->type = RBDSC_READ; - memcpy(message->pool_name, pool_name.c_str(), pool_name.size()); -@@ -82,49 +88,48 @@ public: - message->length = 0; - - boost::asio::async_write(socket_, boost::asio::buffer((char*)message, message->size()), -- [this, result](const boost::system::error_code& err, size_t cb) { -+ [this, on_finish, message](const boost::system::error_code& err, size_t cb) { -+ delete message; - if (!err) { -- get_result(result); -+ get_result(on_finish); - } else { - return -1; - } - }); -- std::unique_lock<std::mutex> lk(m); -- //cv.wait(lk); -- cv.wait_for(lk, std::chrono::milliseconds(100)); -+ - return 0; - } - -- void get_result(bool* result) { -+ void get_result(Context* on_finish) { - boost::asio::async_read(socket_, boost::asio::buffer(buffer_), - boost::asio::transfer_exactly(544), -- [this, result](const boost::system::error_code& err, size_t cb) { -+ [this, on_finish](const boost::system::error_code& err, size_t cb) { -+ if (cb != 544) { -+ assert(0); -+ } - if (!err) { - rbdsc_req_type_t *io_ctx = (rbdsc_req_type_t*)(buffer_); - if (io_ctx->type == RBDSC_READ_REPLY) { -- *result = true; -+ on_finish->complete(true); -+ return; - } else { -- *result = false; -+ on_finish->complete(false); -+ return; - } -- cv.notify_one(); -- m_client_process_msg(std::string(buffer_, cb)); - } else { -- return -1; -+ assert(0); -+ return on_finish->complete(false); - } - }); - } - -- void handle_connect(const boost::system::error_code& error) { -- //TODO(): open librbd snap -- } -- -- void handle_write(const boost::system::error_code& error) { -- } - - private: - boost::asio::io_service& io_service_; - boost::asio::io_service::work io_service_work_; - stream_protocol::socket socket_; -+ -+ std::shared_ptr<std::thread> io_thread; - ClientProcessMsg m_client_process_msg; - stream_protocol::endpoint ep_; - char buffer_[1024]; -diff --git a/src/tools/rbd_cache/CacheControllerSocketCommon.h b/src/tools/rbd_cache/CacheControllerSocketCommon.h -index a9d73a8..e026ec8 100644 ---- a/src/tools/rbd_cache/CacheControllerSocketCommon.h -+++ b/src/tools/rbd_cache/CacheControllerSocketCommon.h -@@ -4,6 +4,7 @@ - #ifndef CACHE_CONTROLLER_SOCKET_COMMON_H - #define CACHE_CONTROLLER_SOCKET_COMMON_H - -+/* - #define RBDSC_REGISTER 0X11 - #define RBDSC_READ 0X12 - #define RBDSC_LOOKUP 0X13 -@@ -11,10 +12,21 @@ - #define RBDSC_READ_REPLY 0X15 - #define RBDSC_LOOKUP_REPLY 0X16 - #define RBDSC_READ_RADOS 0X17 -+*/ - - namespace rbd { - namespace cache { - -+static const int RBDSC_REGISTER = 0X11; -+static const int RBDSC_READ = 0X12; -+static const int RBDSC_LOOKUP = 0X13; -+static const int RBDSC_REGISTER_REPLY = 0X14; -+static const int RBDSC_READ_REPLY = 0X15; -+static const int RBDSC_LOOKUP_REPLY = 0X16; -+static const int RBDSC_READ_RADOS = 0X17; -+ -+ -+ - typedef std::function<void(uint64_t, std::string)> ProcessMsg; - typedef std::function<void(std::string)> ClientProcessMsg; - typedef uint8_t rbdsc_req_type; --- -2.7.4 - diff --git a/src/ceph/0009-librbd-clean-up-on-rbd-shared-cache.patch b/src/ceph/0009-librbd-clean-up-on-rbd-shared-cache.patch deleted file mode 100644 index 4adec93..0000000 --- a/src/ceph/0009-librbd-clean-up-on-rbd-shared-cache.patch +++ /dev/null @@ -1,767 +0,0 @@ -From 55b29a71c238ac465d05035a51808a3b616a8f46 Mon Sep 17 00:00:00 2001 -From: Yuan Zhou <yuan.zhou@intel.com> -Date: Wed, 5 Sep 2018 14:40:54 +0800 -Subject: [PATCH 09/10] librbd: clean up on rbd shared cache - -Signed-off-by: Dehao Shang <dehao.shang@intel.com> -Signed-off-by: Yuan Zhou <yuan.zhou@intel.com> ---- - src/common/options.cc | 4 + - .../SharedPersistentObjectCacherObjectDispatch.cc | 28 ++- - .../SharedPersistentObjectCacherObjectDispatch.h | 3 +- - src/tools/rbd_cache/CacheController.cc | 11 +- - src/tools/rbd_cache/CacheController.h | 1 - - src/tools/rbd_cache/CacheControllerSocket.hpp | 213 +++++++++++++------ - .../rbd_cache/CacheControllerSocketClient.hpp | 226 ++++++++++++++------- - src/tools/rbd_cache/CacheControllerSocketCommon.h | 2 + - 8 files changed, 340 insertions(+), 148 deletions(-) - -diff --git a/src/common/options.cc b/src/common/options.cc -index b334c1e..3172744 100644 ---- a/src/common/options.cc -+++ b/src/common/options.cc -@@ -6365,6 +6365,10 @@ static std::vector<Option> get_rbd_options() { - .set_default("/tmp") - .set_description("shared ssd caching data dir"), - -+ Option("rbd_shared_cache_sock", Option::TYPE_STR, Option::LEVEL_ADVANCED) -+ .set_default("/tmp/rbd_shared_ro_cache_sock") -+ .set_description("shared ssd caching domain socket"), -+ - Option("rbd_shared_cache_entries", Option::TYPE_INT, Option::LEVEL_ADVANCED) - .set_default(4096) - .set_description("shared ssd caching data entries"), -diff --git a/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc b/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc -index 407ce49..7cbc019 100644 ---- a/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc -+++ b/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc -@@ -33,6 +33,7 @@ SharedPersistentObjectCacherObjectDispatch<I>::~SharedPersistentObjectCacherObje - delete m_cache_client; - } - -+// TODO if connect fails, init will return error to high layer. - template <typename I> - void SharedPersistentObjectCacherObjectDispatch<I>::init() { - auto cct = m_image_ctx->cct; -@@ -44,11 +45,11 @@ void SharedPersistentObjectCacherObjectDispatch<I>::init() { - return; - } - -- ldout(cct, 20) << "parent image: setup SRO cache client = " << dendl; -+ ldout(cct, 5) << "parent image: setup SRO cache client = " << dendl; - -- std::string controller_path = "/tmp/rbd_shared_readonly_cache_demo"; -- m_cache_client = new rbd::cache::CacheClient(io_service, controller_path.c_str(), -- ([&](std::string s){client_handle_request(s);})); -+ std::string controller_path = ((CephContext*)cct)->_conf.get_val<std::string>("rbd_shared_cache_sock"); -+ m_cache_client = new rbd::cache::CacheClient(controller_path.c_str(), -+ ([&](std::string s){client_handle_request(s);}), m_image_ctx->cct); - - int ret = m_cache_client->connect(); - if (ret < 0) { -@@ -78,18 +79,29 @@ bool SharedPersistentObjectCacherObjectDispatch<I>::read( - io::ExtentMap* extent_map, int* object_dispatch_flags, - io::DispatchResult* dispatch_result, Context** on_finish, - Context* on_dispatched) { -+ - // IO chained in reverse order -+ -+ // Now, policy is : when session have any error, later all read will dispatched to rados layer. -+ if(!m_cache_client->is_session_work()) { -+ *dispatch_result = io::DISPATCH_RESULT_CONTINUE; -+ on_dispatched->complete(0); -+ return true; -+ // TODO : domain socket have error, all read operation will dispatched to rados layer. -+ } -+ - auto cct = m_image_ctx->cct; - ldout(cct, 20) << "object_no=" << object_no << " " << object_off << "~" - << object_len << dendl; - -+ - on_dispatched = util::create_async_context_callback(*m_image_ctx, - on_dispatched); - auto ctx = new FunctionContext([this, oid, object_off, object_len, read_data, dispatch_result, on_dispatched](bool cache) { - handle_read_cache(cache, oid, object_off, object_len, read_data, dispatch_result, on_dispatched); - }); - -- if (m_cache_client && m_cache_client->connected && m_object_store) { -+ if (m_cache_client && m_cache_client->is_session_work() && m_object_store) { - m_cache_client->lookup_object(m_image_ctx->data_ctx.get_pool_name(), - m_image_ctx->id, oid, ctx); - } -@@ -109,6 +121,7 @@ int SharedPersistentObjectCacherObjectDispatch<I>::handle_read_cache( - // try to read from parent image - if (cache) { - int r = m_object_store->read_object(oid, read_data, object_off, object_len, on_dispatched); -+ //int r = object_len; - if (r != 0) { - *dispatch_result = io::DISPATCH_RESULT_COMPLETE; - //TODO(): complete in syncfile -@@ -123,7 +136,6 @@ int SharedPersistentObjectCacherObjectDispatch<I>::handle_read_cache( - return false; - } - } -- - template <typename I> - void SharedPersistentObjectCacherObjectDispatch<I>::client_handle_request(std::string msg) { - auto cct = m_image_ctx->cct; -@@ -133,7 +145,7 @@ void SharedPersistentObjectCacherObjectDispatch<I>::client_handle_request(std::s - - switch (io_ctx->type) { - case rbd::cache::RBDSC_REGISTER_REPLY: { -- // open cache handler for volume -+ // open cache handler for volume - ldout(cct, 20) << "SRO cache client open cache handler" << dendl; - m_object_store = new SharedPersistentObjectCacher<I>(m_image_ctx, m_image_ctx->shared_cache_path); - -@@ -153,7 +165,7 @@ void SharedPersistentObjectCacherObjectDispatch<I>::client_handle_request(std::s - } - default: ldout(cct, 20) << "nothing" << io_ctx->type <<dendl; - break; -- -+ - } - } - -diff --git a/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.h b/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.h -index 36b868a..5685244 100644 ---- a/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.h -+++ b/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.h -@@ -118,12 +118,11 @@ private: - uint64_t object_len, ceph::bufferlist* read_data, - io::DispatchResult* dispatch_result, - Context* on_dispatched); -+ void client_handle_request(std::string msg); - - ImageCtxT* m_image_ctx; - -- void client_handle_request(std::string msg); - rbd::cache::CacheClient *m_cache_client = nullptr; -- boost::asio::io_service io_service; - }; - - } // namespace cache -diff --git a/src/tools/rbd_cache/CacheController.cc b/src/tools/rbd_cache/CacheController.cc -index c9d674b..620192c 100644 ---- a/src/tools/rbd_cache/CacheController.cc -+++ b/src/tools/rbd_cache/CacheController.cc -@@ -65,12 +65,12 @@ void CacheController::handle_signal(int signum){} - void CacheController::run() { - try { - //TODO(): use new socket path -- std::string controller_path = "/tmp/rbd_shared_readonly_cache_demo"; -+ std::string controller_path = m_cct->_conf.get_val<std::string>("rbd_shared_cache_sock"); - std::remove(controller_path.c_str()); - -- m_cache_server = new CacheServer(io_service, controller_path, -- ([&](uint64_t p, std::string s){handle_request(p, s);})); -- io_service.run(); -+ m_cache_server = new CacheServer(controller_path, -+ ([&](uint64_t p, std::string s){handle_request(p, s);}), m_cct); -+ m_cache_server->run(); - } catch (std::exception& e) { - std::cerr << "Exception: " << e.what() << "\n"; - } -@@ -105,7 +105,8 @@ void CacheController::handle_request(uint64_t session_id, std::string msg){ - - break; - } -- -+ std::cout<<"can't recongize request"<<std::endl; -+ assert(0); // TODO replace it. - } - } - -diff --git a/src/tools/rbd_cache/CacheController.h b/src/tools/rbd_cache/CacheController.h -index 0e3abc1..0e23484 100644 ---- a/src/tools/rbd_cache/CacheController.h -+++ b/src/tools/rbd_cache/CacheController.h -@@ -41,7 +41,6 @@ class CacheController { - void handle_request(uint64_t sesstion_id, std::string msg); - - private: -- boost::asio::io_service io_service; - CacheServer *m_cache_server; - std::vector<const char*> m_args; - CephContext *m_cct; -diff --git a/src/tools/rbd_cache/CacheControllerSocket.hpp b/src/tools/rbd_cache/CacheControllerSocket.hpp -index d178b58..2ff7477 100644 ---- a/src/tools/rbd_cache/CacheControllerSocket.hpp -+++ b/src/tools/rbd_cache/CacheControllerSocket.hpp -@@ -11,6 +11,7 @@ - #include <string> - #include <boost/bind.hpp> - #include <boost/asio.hpp> -+#include <boost/asio/error.hpp> - #include <boost/algorithm/string.hpp> - #include "CacheControllerSocketCommon.h" - -@@ -23,110 +24,202 @@ namespace cache { - class session : public std::enable_shared_from_this<session> { - public: - session(uint64_t session_id, boost::asio::io_service& io_service, ProcessMsg processmsg) -- : session_id(session_id), socket_(io_service), process_msg(processmsg) {} -+ : m_session_id(session_id), m_dm_socket(io_service), process_msg(processmsg) {} - - stream_protocol::socket& socket() { -- return socket_; -+ return m_dm_socket; - } - - void start() { -- -- boost::asio::async_read(socket_, boost::asio::buffer(data_), -- boost::asio::transfer_exactly(544), -+ if(true) { -+ serial_handing_request(); -+ } else { -+ parallel_handing_request(); -+ } -+ } -+ // flow: -+ // -+ // recv request --> process request --> reply ack -+ // | | -+ // --------------<------------------------- -+ void serial_handing_request() { -+ boost::asio::async_read(m_dm_socket, boost::asio::buffer(m_buffer, RBDSC_MSG_LEN), -+ boost::asio::transfer_exactly(RBDSC_MSG_LEN), - boost::bind(&session::handle_read, -- shared_from_this(), -- boost::asio::placeholders::error, -- boost::asio::placeholders::bytes_transferred)); -+ shared_from_this(), -+ boost::asio::placeholders::error, -+ boost::asio::placeholders::bytes_transferred)); -+ } - -+ // flow : -+ // -+ // --> thread 1: process request -+ // recv request --> thread 2: process request --> reply ack -+ // --> thread n: process request -+ // -+ void parallel_handing_request() { -+ // TODO - } - -+private: -+ - void handle_read(const boost::system::error_code& error, size_t bytes_transferred) { -+ // when recv eof, the most proble is that client side close socket. -+ // so, server side need to end handing_request -+ if(error == boost::asio::error::eof) { -+ std::cout<<"session: async_read : " << error.message() << std::endl; -+ return; -+ } - -- if (!error) { -- if(bytes_transferred != 544){ -- assert(0); -- } -- process_msg(session_id, std::string(data_, bytes_transferred)); -+ if(error) { -+ std::cout<<"session: async_read fails: " << error.message() << std::endl; -+ assert(0); -+ } - -+ if(bytes_transferred != RBDSC_MSG_LEN) { -+ std::cout<<"session : request in-complete. "<<std::endl; -+ assert(0); - } -+ -+ // TODO async_process can increse coding readable. -+ // process_msg_callback call handle async_send -+ process_msg(m_session_id, std::string(m_buffer, bytes_transferred)); - } - -- void handle_write(const boost::system::error_code& error) { -- if (!error) { -- boost::asio::async_read(socket_, boost::asio::buffer(data_), -- boost::asio::transfer_exactly(544), -- boost::bind(&session::handle_read, -- shared_from_this(), -- boost::asio::placeholders::error, -- boost::asio::placeholders::bytes_transferred)); -+ void handle_write(const boost::system::error_code& error, size_t bytes_transferred) { -+ if (error) { -+ std::cout<<"session: async_write fails: " << error.message() << std::endl; -+ assert(0); - } -+ -+ if(bytes_transferred != RBDSC_MSG_LEN) { -+ std::cout<<"session : reply in-complete. "<<std::endl; -+ assert(0); -+ } -+ -+ boost::asio::async_read(m_dm_socket, boost::asio::buffer(m_buffer), -+ boost::asio::transfer_exactly(RBDSC_MSG_LEN), -+ boost::bind(&session::handle_read, -+ shared_from_this(), -+ boost::asio::placeholders::error, -+ boost::asio::placeholders::bytes_transferred)); -+ - } - -+public: - void send(std::string msg) { -- -- boost::asio::async_write(socket_, -+ boost::asio::async_write(m_dm_socket, - boost::asio::buffer(msg.c_str(), msg.size()), -- boost::asio::transfer_exactly(544), -+ boost::asio::transfer_exactly(RBDSC_MSG_LEN), - boost::bind(&session::handle_write, -- shared_from_this(), -- boost::asio::placeholders::error)); -+ shared_from_this(), -+ boost::asio::placeholders::error, -+ boost::asio::placeholders::bytes_transferred)); - - } - - private: -- uint64_t session_id; -- stream_protocol::socket socket_; -+ uint64_t m_session_id; -+ stream_protocol::socket m_dm_socket; - ProcessMsg process_msg; - - // Buffer used to store data received from the client. - //std::array<char, 1024> data_; -- char data_[1024]; -+ char m_buffer[1024]; - }; - - typedef std::shared_ptr<session> session_ptr; - - class CacheServer { - public: -- CacheServer(boost::asio::io_service& io_service, -- const std::string& file, ProcessMsg processmsg) -- : io_service_(io_service), -- server_process_msg(processmsg), -- acceptor_(io_service, stream_protocol::endpoint(file)) -- { -- session_ptr new_session(new session(session_id, io_service_, server_process_msg)); -- acceptor_.async_accept(new_session->socket(), -- boost::bind(&CacheServer::handle_accept, this, new_session, -- boost::asio::placeholders::error)); -- } -- -- void handle_accept(session_ptr new_session, -- const boost::system::error_code& error) -- { -- //TODO(): open librbd snap -- if (!error) { -- new_session->start(); -- session_map.emplace(session_id, new_session); -- session_id++; -- new_session.reset(new session(session_id, io_service_, server_process_msg)); -- acceptor_.async_accept(new_session->socket(), -- boost::bind(&CacheServer::handle_accept, this, new_session, -- boost::asio::placeholders::error)); -+ CacheServer(const std::string& file, ProcessMsg processmsg, CephContext* cct) -+ : m_cct(cct), m_server_process_msg(processmsg), -+ m_local_path(file), -+ m_acceptor(m_io_service) -+ {} -+ -+ void run() { -+ bool ret; -+ ret = start_accept(); -+ if(!ret) { -+ return; - } -+ m_io_service.run(); - } - -+ // TODO : use callback to replace this function. - void send(uint64_t session_id, std::string msg) { -- auto it = session_map.find(session_id); -- if (it != session_map.end()) { -+ auto it = m_session_map.find(session_id); -+ if (it != m_session_map.end()) { - it->second->send(msg); -+ } else { -+ // TODO : why don't find existing session id ? -+ std::cout<<"don't find session id..."<<std::endl; -+ assert(0); -+ } -+ } -+ -+private: -+ // when creating one acceptor, can control every step in this way. -+ bool start_accept() { -+ boost::system::error_code ec; -+ m_acceptor.open(m_local_path.protocol(), ec); -+ if(ec) { -+ std::cout << "m_acceptor open fails: " << ec.message() << std::endl; -+ return false; -+ } -+ -+ // TODO control acceptor attribute. -+ -+ m_acceptor.bind(m_local_path, ec); -+ if(ec) { -+ std::cout << "m_acceptor bind fails: " << ec.message() << std::endl; -+ return false; -+ } -+ -+ m_acceptor.listen(boost::asio::socket_base::max_connections, ec); -+ if(ec) { -+ std::cout << "m_acceptor listen fails: " << ec.message() << std::endl; -+ return false; - } -+ -+ accept(); -+ return true; -+ } -+ -+ void accept() { -+ session_ptr new_session(new session(m_session_id, m_io_service, m_server_process_msg)); -+ m_acceptor.async_accept(new_session->socket(), -+ boost::bind(&CacheServer::handle_accept, this, new_session, -+ boost::asio::placeholders::error)); -+ } -+ -+ void handle_accept(session_ptr new_session, const boost::system::error_code& error) { -+ //TODO(): open librbd snap ... yuan -+ -+ if(error) { -+ std::cout << "async accept fails : " << error.message() << std::endl; -+ assert(0); // TODO -+ } -+ -+ // must put session into m_session_map at the front of session.start() -+ m_session_map.emplace(m_session_id, new_session); -+ // TODO : session setting -+ new_session->start(); -+ m_session_id++; -+ -+ // lanuch next accept -+ accept(); - } - - private: -- boost::asio::io_service& io_service_; -- ProcessMsg server_process_msg; -- stream_protocol::acceptor acceptor_; -- uint64_t session_id = 1; -- std::map<uint64_t, session_ptr> session_map; -+ CephContext* m_cct; -+ boost::asio::io_service m_io_service; // TODO wrapper it. -+ ProcessMsg m_server_process_msg; -+ stream_protocol::endpoint m_local_path; -+ stream_protocol::acceptor m_acceptor; -+ uint64_t m_session_id = 1; -+ std::map<uint64_t, session_ptr> m_session_map; - }; - - } // namespace cache -diff --git a/src/tools/rbd_cache/CacheControllerSocketClient.hpp b/src/tools/rbd_cache/CacheControllerSocketClient.hpp -index 3b0ca00..964f888 100644 ---- a/src/tools/rbd_cache/CacheControllerSocketClient.hpp -+++ b/src/tools/rbd_cache/CacheControllerSocketClient.hpp -@@ -4,9 +4,12 @@ - #ifndef CACHE_CONTROLLER_SOCKET_CLIENT_H - #define CACHE_CONTROLLER_SOCKET_CLIENT_H - -+#include <atomic> - #include <boost/asio.hpp> - #include <boost/bind.hpp> -+#include <boost/asio/error.hpp> - #include <boost/algorithm/string.hpp> -+#include "librbd/ImageCtx.h" - #include "include/assert.h" - #include "include/Context.h" - #include "CacheControllerSocketCommon.h" -@@ -19,32 +22,64 @@ namespace cache { - - class CacheClient { - public: -- CacheClient(boost::asio::io_service& io_service, -- const std::string& file, ClientProcessMsg processmsg) -- : io_service_(io_service), -- io_service_work_(io_service), -- socket_(io_service), -+ CacheClient(const std::string& file, ClientProcessMsg processmsg, CephContext* ceph_ctx) -+ : m_io_service_work(m_io_service), -+ m_dm_socket(m_io_service), - m_client_process_msg(processmsg), -- ep_(stream_protocol::endpoint(file)) -+ m_ep(stream_protocol::endpoint(file)), -+ m_session_work(false), -+ cct(ceph_ctx) - { -- io_thread.reset(new std::thread([this](){io_service_.run(); })); -+ // TODO wrapper io_service -+ std::thread thd([this](){ -+ m_io_service.run();}); -+ thd.detach(); - } - -- ~CacheClient() { -- io_service_.stop(); -- io_thread->join(); -+ void run(){ - } - -- void run(){ -- } -+ bool is_session_work() { -+ return m_session_work.load() == true; -+ } -+ -+ // just when error occur, call this method. -+ void close() { -+ m_session_work.store(false); -+ boost::system::error_code close_ec; -+ m_dm_socket.close(close_ec); -+ if(close_ec) { -+ std::cout << "close: " << close_ec.message() << std::endl; -+ } -+ std::cout << "session don't work, later all request will be dispatched to rados layer" << std::endl; -+ } - - int connect() { -- try { -- socket_.connect(ep_); -- } catch (std::exception& e) { -+ boost::system::error_code ec; -+ m_dm_socket.connect(m_ep, ec); -+ if(ec) { -+ if(ec == boost::asio::error::connection_refused) { -+ std::cout << ec.message() << " : maybe rbd-cache Controller don't startup. " -+ << "Now data will be read from ceph cluster " << std::endl; -+ } else { -+ std::cout << "connect: " << ec.message() << std::endl; -+ } -+ -+ if(m_dm_socket.is_open()) { -+ // Set to indicate what error occurred, if any. -+ // Note that, even if the function indicates an error, -+ // the underlying descriptor is closed. -+ boost::system::error_code close_ec; -+ m_dm_socket.close(close_ec); -+ if(close_ec) { -+ std::cout << "close: " << close_ec.message() << std::endl; -+ } -+ } - return -1; - } -- connected = true; -+ -+ std::cout<<"connect success"<<std::endl; -+ - return 0; - } - -@@ -57,27 +92,51 @@ public: - message->vol_size = vol_size; - message->offset = 0; - message->length = 0; -- boost::asio::async_write(socket_, boost::asio::buffer((char*)message, message->size()), -- [this, message](const boost::system::error_code& err, size_t cb) { -- delete message; -- if (!err) { -- boost::asio::async_read(socket_, boost::asio::buffer(buffer_), -- boost::asio::transfer_exactly(544), -- [this](const boost::system::error_code& err, size_t cb) { -- if (!err) { -- m_client_process_msg(std::string(buffer_, cb)); -- } else { -- return -1; -- } -- }); -- } else { -- return -1; -- } -- }); -+ -+ uint64_t ret; -+ boost::system::error_code ec; -+ -+ ret = boost::asio::write(m_dm_socket, boost::asio::buffer((char*)message, message->size()), ec); -+ if(ec) { -+ std::cout << "write fails : " << ec.message() << std::endl; -+ return -1; -+ } -+ -+ if(ret != message->size()) { -+ std::cout << "write fails : ret != send_bytes "<< std::endl; -+ return -1; -+ } -+ -+ // hard code TODO -+ ret = boost::asio::read(m_dm_socket, boost::asio::buffer(m_recv_buffer, RBDSC_MSG_LEN), ec); -+ if(ec == boost::asio::error::eof) { -+ std::cout<< "recv eof"<<std::endl; -+ return -1; -+ } -+ -+ if(ec) { -+ std::cout << "write fails : " << ec.message() << std::endl; -+ return -1; -+ } -+ -+ if(ret != RBDSC_MSG_LEN) { -+ std::cout << "write fails : ret != receive bytes " << std::endl; -+ return -1; -+ } -+ -+ m_client_process_msg(std::string(m_recv_buffer, ret)); -+ -+ delete message; -+ -+ std::cout << "register volume success" << std::endl; -+ -+ // TODO -+ m_session_work.store(true); - - return 0; - } - -+ // if occur any error, we just return false. Then read from rados. - int lookup_object(std::string pool_name, std::string vol_name, std::string object_id, Context* on_finish) { - rbdsc_req_type_t *message = new rbdsc_req_type_t(); - message->type = RBDSC_READ; -@@ -87,59 +146,82 @@ public: - message->offset = 0; - message->length = 0; - -- boost::asio::async_write(socket_, boost::asio::buffer((char*)message, message->size()), -+ boost::asio::async_write(m_dm_socket, -+ boost::asio::buffer((char*)message, message->size()), -+ boost::asio::transfer_exactly(RBDSC_MSG_LEN), - [this, on_finish, message](const boost::system::error_code& err, size_t cb) { -- delete message; -- if (!err) { -+ delete message; -+ if(err) { -+ std::cout<< "lookup_object: async_write fails." << err.message() << std::endl; -+ close(); -+ on_finish->complete(false); -+ return; -+ } -+ if(cb != RBDSC_MSG_LEN) { -+ std::cout<< "lookup_object: async_write fails. in-complete request" <<std::endl; -+ close(); -+ on_finish->complete(false); -+ return; -+ } - get_result(on_finish); -- } else { -- return -1; -- } - }); - - return 0; - } - - void get_result(Context* on_finish) { -- boost::asio::async_read(socket_, boost::asio::buffer(buffer_), -- boost::asio::transfer_exactly(544), -+ boost::asio::async_read(m_dm_socket, boost::asio::buffer(m_recv_buffer, RBDSC_MSG_LEN), -+ boost::asio::transfer_exactly(RBDSC_MSG_LEN), - [this, on_finish](const boost::system::error_code& err, size_t cb) { -- if (cb != 544) { -- assert(0); -- } -- if (!err) { -- rbdsc_req_type_t *io_ctx = (rbdsc_req_type_t*)(buffer_); -- if (io_ctx->type == RBDSC_READ_REPLY) { -- on_finish->complete(true); -- return; -- } else { -- on_finish->complete(false); -- return; -- } -- } else { -- assert(0); -- return on_finish->complete(false); -- } -+ if(err == boost::asio::error::eof) { -+ std::cout<<"get_result: ack is EOF." << std::endl; -+ close(); -+ on_finish->complete(false); -+ return; -+ } -+ if(err) { -+ std::cout<< "get_result: async_read fails:" << err.message() << std::endl; -+ close(); -+ on_finish->complete(false); // TODO replace this assert with some metohds. -+ return; -+ } -+ if (cb != RBDSC_MSG_LEN) { -+ close(); -+ std::cout << "get_result: in-complete ack." << std::endl; -+ on_finish->complete(false); // TODO: replace this assert with some methods. -+ } -+ -+ rbdsc_req_type_t *io_ctx = (rbdsc_req_type_t*)(m_recv_buffer); -+ -+ // TODO: re-occur yuan's bug -+ if(io_ctx->type == RBDSC_READ) { -+ std::cout << "get rbdsc_read... " << std::endl; -+ assert(0); -+ } -+ -+ if (io_ctx->type == RBDSC_READ_REPLY) { -+ on_finish->complete(true); -+ return; -+ } else { -+ on_finish->complete(false); -+ return; -+ } - }); - } - -- - private: -- boost::asio::io_service& io_service_; -- boost::asio::io_service::work io_service_work_; -- stream_protocol::socket socket_; -- -- std::shared_ptr<std::thread> io_thread; -+ boost::asio::io_service m_io_service; -+ boost::asio::io_service::work m_io_service_work; -+ stream_protocol::socket m_dm_socket; - ClientProcessMsg m_client_process_msg; -- stream_protocol::endpoint ep_; -- char buffer_[1024]; -- int block_size_ = 1024; -- -- std::condition_variable cv; -- std::mutex m; -- --public: -- bool connected = false; -+ stream_protocol::endpoint m_ep; -+ char m_recv_buffer[1024]; -+ -+ // atomic modfiy for this variable. -+ // thread 1 : asio callback thread modify it. -+ // thread 2 : librbd read it. -+ std::atomic<bool> m_session_work; -+ CephContext* cct; - }; - - } // namespace cache -diff --git a/src/tools/rbd_cache/CacheControllerSocketCommon.h b/src/tools/rbd_cache/CacheControllerSocketCommon.h -index e026ec8..e17529a 100644 ---- a/src/tools/rbd_cache/CacheControllerSocketCommon.h -+++ b/src/tools/rbd_cache/CacheControllerSocketCommon.h -@@ -55,6 +55,8 @@ struct rbdsc_req_type_t { - } - }; - -+static const int RBDSC_MSG_LEN = sizeof(rbdsc_req_type_t); -+ - } // namespace cache - } // namespace rbd - #endif --- -2.7.4 - diff --git a/src/ceph/0010-librbd-new-namespace-ceph-immutable-obj-cache.patch b/src/ceph/0010-librbd-new-namespace-ceph-immutable-obj-cache.patch deleted file mode 100644 index 46040e6..0000000 --- a/src/ceph/0010-librbd-new-namespace-ceph-immutable-obj-cache.patch +++ /dev/null @@ -1,3510 +0,0 @@ -From 9a097084d06e7186206b43d9c81d1f648791d7a4 Mon Sep 17 00:00:00 2001 -From: Yuan Zhou <yuan.zhou@intel.com> -Date: Fri, 7 Sep 2018 08:29:51 +0800 -Subject: [PATCH 10/10] librbd: new namespace ceph immutable obj cache - -clean up class/func names to use the new namespace - -Signed-off-by: Yuan Zhou <yuan.zhou@intel.com> ---- - src/common/options.cc | 2 +- - src/common/subsys.h | 3 +- - src/librbd/CMakeLists.txt | 3 +- - .../SharedPersistentObjectCacherObjectDispatch.cc | 175 ---------------- - .../SharedPersistentObjectCacherObjectDispatch.h | 133 ------------ - src/librbd/cache/SharedReadOnlyObjectDispatch.cc | 170 +++++++++++++++ - src/librbd/cache/SharedReadOnlyObjectDispatch.h | 126 ++++++++++++ - src/librbd/image/OpenRequest.cc | 4 +- - src/tools/CMakeLists.txt | 2 +- - .../ceph_immutable_object_cache/CMakeLists.txt | 11 + - .../ceph_immutable_object_cache/CacheClient.cc | 205 ++++++++++++++++++ - .../ceph_immutable_object_cache/CacheClient.h | 53 +++++ - .../ceph_immutable_object_cache/CacheController.cc | 117 +++++++++++ - .../ceph_immutable_object_cache/CacheController.h | 53 +++++ - .../ceph_immutable_object_cache/CacheServer.cc | 99 +++++++++ - .../ceph_immutable_object_cache/CacheServer.h | 54 +++++ - .../ceph_immutable_object_cache/CacheSession.cc | 115 +++++++++++ - .../ceph_immutable_object_cache/CacheSession.h | 58 ++++++ - .../ObjectCacheStore.cc | 172 ++++++++++++++++ - .../ceph_immutable_object_cache/ObjectCacheStore.h | 70 +++++++ - src/tools/ceph_immutable_object_cache/Policy.hpp | 33 +++ - .../ceph_immutable_object_cache/SimplePolicy.hpp | 163 +++++++++++++++ - .../ceph_immutable_object_cache/SocketCommon.h | 54 +++++ - src/tools/ceph_immutable_object_cache/main.cc | 85 ++++++++ - src/tools/rbd_cache/CMakeLists.txt | 9 - - src/tools/rbd_cache/CacheController.cc | 116 ----------- - src/tools/rbd_cache/CacheController.h | 54 ----- - src/tools/rbd_cache/CacheControllerSocket.hpp | 228 -------------------- - .../rbd_cache/CacheControllerSocketClient.hpp | 229 --------------------- - src/tools/rbd_cache/CacheControllerSocketCommon.h | 62 ------ - src/tools/rbd_cache/ObjectCacheStore.cc | 172 ---------------- - src/tools/rbd_cache/ObjectCacheStore.h | 70 ------- - src/tools/rbd_cache/Policy.hpp | 30 --- - src/tools/rbd_cache/SimplePolicy.hpp | 160 -------------- - src/tools/rbd_cache/main.cc | 85 -------- - 35 files changed, 1646 insertions(+), 1529 deletions(-) - delete mode 100644 src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc - delete mode 100644 src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.h - create mode 100644 src/librbd/cache/SharedReadOnlyObjectDispatch.cc - create mode 100644 src/librbd/cache/SharedReadOnlyObjectDispatch.h - create mode 100644 src/tools/ceph_immutable_object_cache/CMakeLists.txt - create mode 100644 src/tools/ceph_immutable_object_cache/CacheClient.cc - create mode 100644 src/tools/ceph_immutable_object_cache/CacheClient.h - create mode 100644 src/tools/ceph_immutable_object_cache/CacheController.cc - create mode 100644 src/tools/ceph_immutable_object_cache/CacheController.h - create mode 100644 src/tools/ceph_immutable_object_cache/CacheServer.cc - create mode 100644 src/tools/ceph_immutable_object_cache/CacheServer.h - create mode 100644 src/tools/ceph_immutable_object_cache/CacheSession.cc - create mode 100644 src/tools/ceph_immutable_object_cache/CacheSession.h - create mode 100644 src/tools/ceph_immutable_object_cache/ObjectCacheStore.cc - create mode 100644 src/tools/ceph_immutable_object_cache/ObjectCacheStore.h - create mode 100644 src/tools/ceph_immutable_object_cache/Policy.hpp - create mode 100644 src/tools/ceph_immutable_object_cache/SimplePolicy.hpp - create mode 100644 src/tools/ceph_immutable_object_cache/SocketCommon.h - create mode 100644 src/tools/ceph_immutable_object_cache/main.cc - delete mode 100644 src/tools/rbd_cache/CMakeLists.txt - delete mode 100644 src/tools/rbd_cache/CacheController.cc - delete mode 100644 src/tools/rbd_cache/CacheController.h - delete mode 100644 src/tools/rbd_cache/CacheControllerSocket.hpp - delete mode 100644 src/tools/rbd_cache/CacheControllerSocketClient.hpp - delete mode 100644 src/tools/rbd_cache/CacheControllerSocketCommon.h - delete mode 100644 src/tools/rbd_cache/ObjectCacheStore.cc - delete mode 100644 src/tools/rbd_cache/ObjectCacheStore.h - delete mode 100644 src/tools/rbd_cache/Policy.hpp - delete mode 100644 src/tools/rbd_cache/SimplePolicy.hpp - delete mode 100644 src/tools/rbd_cache/main.cc - -diff --git a/src/common/options.cc b/src/common/options.cc -index 3172744..bf00aab 100644 ---- a/src/common/options.cc -+++ b/src/common/options.cc -@@ -6358,7 +6358,7 @@ static std::vector<Option> get_rbd_options() { - .set_description("time in seconds for detecting a hung thread"), - - Option("rbd_shared_cache_enabled", Option::TYPE_BOOL, Option::LEVEL_ADVANCED) -- .set_default(true) -+ .set_default(false) - .set_description("whether to enable shared ssd caching"), - - Option("rbd_shared_cache_path", Option::TYPE_STR, Option::LEVEL_ADVANCED) -diff --git a/src/common/subsys.h b/src/common/subsys.h -index bdd2d0e..5b532c1 100644 ---- a/src/common/subsys.h -+++ b/src/common/subsys.h -@@ -36,9 +36,10 @@ SUBSYS(objecter, 0, 1) - SUBSYS(rados, 0, 5) - SUBSYS(rbd, 0, 5) - SUBSYS(rbd_mirror, 0, 5) --SUBSYS(rbd_replay, 0, 5) - SUBSYS(journaler, 0, 5) - SUBSYS(objectcacher, 0, 5) -+SUBSYS(immutable_obj_cache, 0, 5) -+SUBSYS(rbd_replay, 0, 5) - SUBSYS(client, 0, 5) - SUBSYS(osd, 1, 5) - SUBSYS(optracker, 0, 5) -diff --git a/src/librbd/CMakeLists.txt b/src/librbd/CMakeLists.txt -index 540ee78..c9bfb6f 100644 ---- a/src/librbd/CMakeLists.txt -+++ b/src/librbd/CMakeLists.txt -@@ -32,7 +32,7 @@ set(librbd_internal_srcs - api/Snapshot.cc - cache/ImageWriteback.cc - cache/ObjectCacherObjectDispatch.cc -- cache/SharedPersistentObjectCacherObjectDispatch.cc -+ cache/SharedReadOnlyObjectDispatch.cc - cache/SharedPersistentObjectCacher.cc - cache/SharedPersistentObjectCacherFile.cc - deep_copy/ImageCopyRequest.cc -@@ -125,6 +125,7 @@ set(librbd_internal_srcs - trash/MoveRequest.cc - watcher/Notifier.cc - watcher/RewatchRequest.cc -+ ${CMAKE_SOURCE_DIR}/src/tools/ceph_immutable_object_cache/CacheClient.cc - ${CMAKE_SOURCE_DIR}/src/common/ContextCompletion.cc) - - add_library(rbd_api STATIC librbd.cc) -diff --git a/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc b/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc -deleted file mode 100644 -index 7cbc019..0000000 ---- a/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc -+++ /dev/null -@@ -1,175 +0,0 @@ --// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- --// vim: ts=8 sw=2 smarttab -- --#include "librbd/cache/SharedPersistentObjectCacherObjectDispatch.h" --#include "common/WorkQueue.h" --#include "librbd/ImageCtx.h" --#include "librbd/Journal.h" --#include "librbd/Utils.h" --#include "librbd/LibrbdWriteback.h" --#include "librbd/io/ObjectDispatchSpec.h" --#include "librbd/io/ObjectDispatcher.h" --#include "librbd/io/Utils.h" --#include "osd/osd_types.h" --#include "osdc/WritebackHandler.h" --#include <vector> -- --#define dout_subsys ceph_subsys_rbd --#undef dout_prefix --#define dout_prefix *_dout << "librbd::cache::SharedPersistentObjectCacherObjectDispatch: " \ -- << this << " " << __func__ << ": " -- --namespace librbd { --namespace cache { -- --template <typename I> --SharedPersistentObjectCacherObjectDispatch<I>::SharedPersistentObjectCacherObjectDispatch( -- I* image_ctx) : m_image_ctx(image_ctx) { --} -- --template <typename I> --SharedPersistentObjectCacherObjectDispatch<I>::~SharedPersistentObjectCacherObjectDispatch() { -- delete m_object_store; -- delete m_cache_client; --} -- --// TODO if connect fails, init will return error to high layer. --template <typename I> --void SharedPersistentObjectCacherObjectDispatch<I>::init() { -- auto cct = m_image_ctx->cct; -- ldout(cct, 5) << dendl; -- -- if (m_image_ctx->parent != nullptr) { -- //TODO(): should we cover multi-leveled clone? -- ldout(cct, 5) << "child image: skipping SRO cache client" << dendl; -- return; -- } -- -- ldout(cct, 5) << "parent image: setup SRO cache client = " << dendl; -- -- std::string controller_path = ((CephContext*)cct)->_conf.get_val<std::string>("rbd_shared_cache_sock"); -- m_cache_client = new rbd::cache::CacheClient(controller_path.c_str(), -- ([&](std::string s){client_handle_request(s);}), m_image_ctx->cct); -- -- int ret = m_cache_client->connect(); -- if (ret < 0) { -- ldout(cct, 5) << "SRO cache client fail to connect with local controller: " -- << "please start rbd-cache daemon" -- << dendl; -- } else { -- ldout(cct, 5) << "SRO cache client to register volume on rbd-cache daemon: " -- << "name = " << m_image_ctx->id -- << dendl; -- -- ret = m_cache_client->register_volume(m_image_ctx->data_ctx.get_pool_name(), -- m_image_ctx->id, m_image_ctx->size); -- -- if (ret >= 0) { -- // add ourself to the IO object dispatcher chain -- m_image_ctx->io_object_dispatcher->register_object_dispatch(this); -- } -- } --} -- --template <typename I> --bool SharedPersistentObjectCacherObjectDispatch<I>::read( -- const std::string &oid, uint64_t object_no, uint64_t object_off, -- uint64_t object_len, librados::snap_t snap_id, int op_flags, -- const ZTracer::Trace &parent_trace, ceph::bufferlist* read_data, -- io::ExtentMap* extent_map, int* object_dispatch_flags, -- io::DispatchResult* dispatch_result, Context** on_finish, -- Context* on_dispatched) { -- -- // IO chained in reverse order -- -- // Now, policy is : when session have any error, later all read will dispatched to rados layer. -- if(!m_cache_client->is_session_work()) { -- *dispatch_result = io::DISPATCH_RESULT_CONTINUE; -- on_dispatched->complete(0); -- return true; -- // TODO : domain socket have error, all read operation will dispatched to rados layer. -- } -- -- auto cct = m_image_ctx->cct; -- ldout(cct, 20) << "object_no=" << object_no << " " << object_off << "~" -- << object_len << dendl; -- -- -- on_dispatched = util::create_async_context_callback(*m_image_ctx, -- on_dispatched); -- auto ctx = new FunctionContext([this, oid, object_off, object_len, read_data, dispatch_result, on_dispatched](bool cache) { -- handle_read_cache(cache, oid, object_off, object_len, read_data, dispatch_result, on_dispatched); -- }); -- -- if (m_cache_client && m_cache_client->is_session_work() && m_object_store) { -- m_cache_client->lookup_object(m_image_ctx->data_ctx.get_pool_name(), -- m_image_ctx->id, oid, ctx); -- } -- return true; --} -- --template <typename I> --int SharedPersistentObjectCacherObjectDispatch<I>::handle_read_cache( -- bool cache, -- const std::string &oid, uint64_t object_off, uint64_t object_len, -- ceph::bufferlist* read_data, io::DispatchResult* dispatch_result, -- Context* on_dispatched) { -- // IO chained in reverse order -- auto cct = m_image_ctx->cct; -- ldout(cct, 20) << dendl; -- -- // try to read from parent image -- if (cache) { -- int r = m_object_store->read_object(oid, read_data, object_off, object_len, on_dispatched); -- //int r = object_len; -- if (r != 0) { -- *dispatch_result = io::DISPATCH_RESULT_COMPLETE; -- //TODO(): complete in syncfile -- on_dispatched->complete(r); -- ldout(cct, 20) << "AAAAcomplete=" << *dispatch_result <<dendl; -- return true; -- } -- } else { -- *dispatch_result = io::DISPATCH_RESULT_CONTINUE; -- on_dispatched->complete(0); -- ldout(cct, 20) << "BBB no cache" << *dispatch_result <<dendl; -- return false; -- } --} --template <typename I> --void SharedPersistentObjectCacherObjectDispatch<I>::client_handle_request(std::string msg) { -- auto cct = m_image_ctx->cct; -- ldout(cct, 20) << dendl; -- -- rbd::cache::rbdsc_req_type_t *io_ctx = (rbd::cache::rbdsc_req_type_t*)(msg.c_str()); -- -- switch (io_ctx->type) { -- case rbd::cache::RBDSC_REGISTER_REPLY: { -- // open cache handler for volume -- ldout(cct, 20) << "SRO cache client open cache handler" << dendl; -- m_object_store = new SharedPersistentObjectCacher<I>(m_image_ctx, m_image_ctx->shared_cache_path); -- -- break; -- } -- case rbd::cache::RBDSC_READ_REPLY: { -- ldout(cct, 20) << "SRO cache client start to read cache" << dendl; -- //TODO(): should call read here -- -- break; -- } -- case rbd::cache::RBDSC_READ_RADOS: { -- ldout(cct, 20) << "SRO cache client start to read rados" << dendl; -- //TODO(): should call read here -- -- break; -- } -- default: ldout(cct, 20) << "nothing" << io_ctx->type <<dendl; -- break; -- -- } --} -- --} // namespace cache --} // namespace librbd -- --template class librbd::cache::SharedPersistentObjectCacherObjectDispatch<librbd::ImageCtx>; -diff --git a/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.h b/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.h -deleted file mode 100644 -index 5685244..0000000 ---- a/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.h -+++ /dev/null -@@ -1,133 +0,0 @@ --// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- --// vim: ts=8 sw=2 smarttab -- --#ifndef CEPH_LIBRBD_CACHE_SHARED_PERSISTENT_OBJECT_CACHER_OBJECT_DISPATCH_H --#define CEPH_LIBRBD_CACHE_SHARED_PERSISTENT_OBJECT_CACHER_OBJECT_DISPATCH_H -- --#include "librbd/io/ObjectDispatchInterface.h" --#include "common/Mutex.h" --#include "osdc/ObjectCacher.h" --#include "tools/rbd_cache/CacheControllerSocketClient.hpp" --#include "SharedPersistentObjectCacher.h" -- --struct WritebackHandler; -- --namespace librbd { -- --class ImageCtx; -- --namespace cache { -- --/** -- * Facade around the OSDC object cacher to make it align with -- * the object dispatcher interface -- */ --template <typename ImageCtxT = ImageCtx> --class SharedPersistentObjectCacherObjectDispatch : public io::ObjectDispatchInterface { --public: -- static SharedPersistentObjectCacherObjectDispatch* create(ImageCtxT* image_ctx) { -- return new SharedPersistentObjectCacherObjectDispatch(image_ctx); -- } -- -- SharedPersistentObjectCacherObjectDispatch(ImageCtxT* image_ctx); -- ~SharedPersistentObjectCacherObjectDispatch() override; -- -- io::ObjectDispatchLayer get_object_dispatch_layer() const override { -- return io::OBJECT_DISPATCH_LAYER_SHARED_PERSISTENT_CACHE; -- } -- -- void init(); -- void shut_down(Context* on_finish) { -- m_image_ctx->op_work_queue->queue(on_finish, 0); -- } -- -- bool read( -- const std::string &oid, uint64_t object_no, uint64_t object_off, -- uint64_t object_len, librados::snap_t snap_id, int op_flags, -- const ZTracer::Trace &parent_trace, ceph::bufferlist* read_data, -- io::ExtentMap* extent_map, int* object_dispatch_flags, -- io::DispatchResult* dispatch_result, Context** on_finish, -- Context* on_dispatched) override; -- -- bool discard( -- const std::string &oid, uint64_t object_no, uint64_t object_off, -- uint64_t object_len, const ::SnapContext &snapc, int discard_flags, -- const ZTracer::Trace &parent_trace, int* object_dispatch_flags, -- uint64_t* journal_tid, io::DispatchResult* dispatch_result, -- Context** on_finish, Context* on_dispatched) { -- return false; -- } -- -- bool write( -- const std::string &oid, uint64_t object_no, uint64_t object_off, -- ceph::bufferlist&& data, const ::SnapContext &snapc, int op_flags, -- const ZTracer::Trace &parent_trace, int* object_dispatch_flags, -- uint64_t* journal_tid, io::DispatchResult* dispatch_result, -- Context** on_finish, Context* on_dispatched) { -- return false; -- } -- -- bool write_same( -- const std::string &oid, uint64_t object_no, uint64_t object_off, -- uint64_t object_len, io::Extents&& buffer_extents, -- ceph::bufferlist&& data, const ::SnapContext &snapc, int op_flags, -- const ZTracer::Trace &parent_trace, int* object_dispatch_flags, -- uint64_t* journal_tid, io::DispatchResult* dispatch_result, -- Context** on_finish, Context* on_dispatched) { -- return false; -- } -- -- bool compare_and_write( -- const std::string &oid, uint64_t object_no, uint64_t object_off, -- ceph::bufferlist&& cmp_data, ceph::bufferlist&& write_data, -- const ::SnapContext &snapc, int op_flags, -- const ZTracer::Trace &parent_trace, uint64_t* mismatch_offset, -- int* object_dispatch_flags, uint64_t* journal_tid, -- io::DispatchResult* dispatch_result, Context** on_finish, -- Context* on_dispatched) { -- return false; -- } -- -- bool flush( -- io::FlushSource flush_source, const ZTracer::Trace &parent_trace, -- io::DispatchResult* dispatch_result, Context** on_finish, -- Context* on_dispatched) { -- return false; -- } -- -- bool invalidate_cache(Context* on_finish) { -- return false; -- } -- -- bool reset_existence_cache(Context* on_finish) { -- return false; -- } -- -- void extent_overwritten( -- uint64_t object_no, uint64_t object_off, uint64_t object_len, -- uint64_t journal_tid, uint64_t new_journal_tid) { -- } -- -- SharedPersistentObjectCacher<ImageCtxT> *m_object_store = nullptr; -- --private: -- -- int handle_read_cache( -- bool cache, -- const std::string &oid, uint64_t object_off, -- uint64_t object_len, ceph::bufferlist* read_data, -- io::DispatchResult* dispatch_result, -- Context* on_dispatched); -- void client_handle_request(std::string msg); -- -- ImageCtxT* m_image_ctx; -- -- rbd::cache::CacheClient *m_cache_client = nullptr; --}; -- --} // namespace cache --} // namespace librbd -- --extern template class librbd::cache::SharedPersistentObjectCacherObjectDispatch<librbd::ImageCtx>; -- --#endif // CEPH_LIBRBD_CACHE_OBJECT_CACHER_OBJECT_DISPATCH_H -diff --git a/src/librbd/cache/SharedReadOnlyObjectDispatch.cc b/src/librbd/cache/SharedReadOnlyObjectDispatch.cc -new file mode 100644 -index 0000000..23c7dbe ---- /dev/null -+++ b/src/librbd/cache/SharedReadOnlyObjectDispatch.cc -@@ -0,0 +1,170 @@ -+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -+// vim: ts=8 sw=2 smarttab -+ -+#include "common/WorkQueue.h" -+#include "librbd/ImageCtx.h" -+#include "librbd/Journal.h" -+#include "librbd/Utils.h" -+#include "librbd/LibrbdWriteback.h" -+#include "librbd/io/ObjectDispatchSpec.h" -+#include "librbd/io/ObjectDispatcher.h" -+#include "librbd/io/Utils.h" -+#include "librbd/cache/SharedReadOnlyObjectDispatch.h" -+#include "osd/osd_types.h" -+#include "osdc/WritebackHandler.h" -+ -+#include <vector> -+ -+#define dout_subsys ceph_subsys_rbd -+#undef dout_prefix -+#define dout_prefix *_dout << "librbd::cache::SharedReadOnlyObjectDispatch: " \ -+ << this << " " << __func__ << ": " -+ -+namespace librbd { -+namespace cache { -+ -+template <typename I> -+SharedReadOnlyObjectDispatch<I>::SharedReadOnlyObjectDispatch( -+ I* image_ctx) : m_image_ctx(image_ctx) { -+} -+ -+template <typename I> -+SharedReadOnlyObjectDispatch<I>::~SharedReadOnlyObjectDispatch() { -+ delete m_object_store; -+ delete m_cache_client; -+} -+ -+// TODO if connect fails, init will return error to high layer. -+template <typename I> -+void SharedReadOnlyObjectDispatch<I>::init() { -+ auto cct = m_image_ctx->cct; -+ ldout(cct, 5) << dendl; -+ -+ if (m_image_ctx->parent != nullptr) { -+ //TODO(): should we cover multi-leveled clone? -+ ldout(cct, 5) << "child image: skipping SRO cache client" << dendl; -+ return; -+ } -+ -+ ldout(cct, 5) << "parent image: setup SRO cache client = " << dendl; -+ -+ std::string controller_path = ((CephContext*)cct)->_conf.get_val<std::string>("rbd_shared_cache_sock"); -+ m_cache_client = new ceph::immutable_obj_cache::CacheClient(controller_path.c_str(), -+ ([&](std::string s){client_handle_request(s);}), m_image_ctx->cct); -+ -+ int ret = m_cache_client->connect(); -+ if (ret < 0) { -+ ldout(cct, 5) << "SRO cache client fail to connect with local controller: " -+ << "please start rbd-cache daemon" -+ << dendl; -+ } else { -+ ldout(cct, 5) << "SRO cache client to register volume on rbd-cache daemon: " -+ << "name = " << m_image_ctx->id -+ << dendl; -+ -+ ret = m_cache_client->register_volume(m_image_ctx->data_ctx.get_pool_name(), -+ m_image_ctx->id, m_image_ctx->size); -+ -+ if (ret >= 0) { -+ // add ourself to the IO object dispatcher chain -+ m_image_ctx->io_object_dispatcher->register_object_dispatch(this); -+ } -+ } -+} -+ -+template <typename I> -+bool SharedReadOnlyObjectDispatch<I>::read( -+ const std::string &oid, uint64_t object_no, uint64_t object_off, -+ uint64_t object_len, librados::snap_t snap_id, int op_flags, -+ const ZTracer::Trace &parent_trace, ceph::bufferlist* read_data, -+ io::ExtentMap* extent_map, int* object_dispatch_flags, -+ io::DispatchResult* dispatch_result, Context** on_finish, -+ Context* on_dispatched) { -+ auto cct = m_image_ctx->cct; -+ ldout(cct, 20) << "object_no=" << object_no << " " << object_off << "~" -+ << object_len << dendl; -+ -+ // if any session fails, later reads will go to rados -+ if(!m_cache_client->is_session_work()) { -+ *dispatch_result = io::DISPATCH_RESULT_CONTINUE; -+ on_dispatched->complete(0); -+ return true; -+ // TODO(): fix domain socket error -+ } -+ -+ auto ctx = new FunctionContext([this, oid, object_off, object_len, -+ read_data, dispatch_result, on_dispatched](bool cache) { -+ handle_read_cache(cache, oid, object_off, object_len, -+ read_data, dispatch_result, on_dispatched); -+ }); -+ -+ if (m_cache_client && m_cache_client->is_session_work() && m_object_store) { -+ m_cache_client->lookup_object(m_image_ctx->data_ctx.get_pool_name(), -+ m_image_ctx->id, oid, ctx); -+ } -+ return true; -+} -+ -+template <typename I> -+int SharedReadOnlyObjectDispatch<I>::handle_read_cache( -+ bool cache, const std::string &oid, uint64_t object_off, -+ uint64_t object_len, ceph::bufferlist* read_data, -+ io::DispatchResult* dispatch_result, Context* on_dispatched) { -+ auto cct = m_image_ctx->cct; -+ ldout(cct, 20) << dendl; -+ -+ // try to read from parent image -+ if (cache) { -+ int r = m_object_store->read_object(oid, read_data, object_off, object_len, on_dispatched); -+ //int r = object_len; -+ if (r != 0) { -+ *dispatch_result = io::DISPATCH_RESULT_COMPLETE; -+ //TODO(): complete in syncfile -+ on_dispatched->complete(r); -+ ldout(cct, 20) << "read cache: " << *dispatch_result <<dendl; -+ return true; -+ } -+ } else { -+ *dispatch_result = io::DISPATCH_RESULT_CONTINUE; -+ on_dispatched->complete(0); -+ ldout(cct, 20) << "read rados: " << *dispatch_result <<dendl; -+ return false; -+ } -+} -+template <typename I> -+void SharedReadOnlyObjectDispatch<I>::client_handle_request(std::string msg) { -+ auto cct = m_image_ctx->cct; -+ ldout(cct, 20) << dendl; -+ -+ ceph::immutable_obj_cache::rbdsc_req_type_t *io_ctx = (ceph::immutable_obj_cache::rbdsc_req_type_t*)(msg.c_str()); -+ -+ switch (io_ctx->type) { -+ case ceph::immutable_obj_cache::RBDSC_REGISTER_REPLY: { -+ // open cache handler for volume -+ ldout(cct, 20) << "SRO cache client open cache handler" << dendl; -+ m_object_store = new SharedPersistentObjectCacher<I>(m_image_ctx, m_image_ctx->shared_cache_path); -+ -+ break; -+ } -+ case ceph::immutable_obj_cache::RBDSC_READ_REPLY: { -+ ldout(cct, 20) << "SRO cache client start to read cache" << dendl; -+ //TODO(): should call read here -+ -+ break; -+ } -+ case ceph::immutable_obj_cache::RBDSC_READ_RADOS: { -+ ldout(cct, 20) << "SRO cache client start to read rados" << dendl; -+ //TODO(): should call read here -+ -+ break; -+ } -+ default: ldout(cct, 20) << "nothing" << io_ctx->type <<dendl; -+ break; -+ -+ } -+} -+ -+} // namespace cache -+} // namespace librbd -+ -+template class librbd::cache::SharedReadOnlyObjectDispatch<librbd::ImageCtx>; -diff --git a/src/librbd/cache/SharedReadOnlyObjectDispatch.h b/src/librbd/cache/SharedReadOnlyObjectDispatch.h -new file mode 100644 -index 0000000..9b56da9 ---- /dev/null -+++ b/src/librbd/cache/SharedReadOnlyObjectDispatch.h -@@ -0,0 +1,126 @@ -+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -+// vim: ts=8 sw=2 smarttab -+ -+#ifndef CEPH_LIBRBD_CACHE_SHARED_PERSISTENT_OBJECT_CACHER_OBJECT_DISPATCH_H -+#define CEPH_LIBRBD_CACHE_SHARED_PERSISTENT_OBJECT_CACHER_OBJECT_DISPATCH_H -+ -+#include "common/Mutex.h" -+#include "SharedPersistentObjectCacher.h" -+#include "librbd/io/ObjectDispatchInterface.h" -+#include "tools/ceph_immutable_object_cache/CacheClient.h" -+ -+ -+namespace librbd { -+ -+class ImageCtx; -+ -+namespace cache { -+ -+template <typename ImageCtxT = ImageCtx> -+class SharedReadOnlyObjectDispatch : public io::ObjectDispatchInterface { -+public: -+ static SharedReadOnlyObjectDispatch* create(ImageCtxT* image_ctx) { -+ return new SharedReadOnlyObjectDispatch(image_ctx); -+ } -+ -+ SharedReadOnlyObjectDispatch(ImageCtxT* image_ctx); -+ ~SharedReadOnlyObjectDispatch() override; -+ -+ io::ObjectDispatchLayer get_object_dispatch_layer() const override { -+ return io::OBJECT_DISPATCH_LAYER_SHARED_PERSISTENT_CACHE; -+ } -+ -+ void init(); -+ void shut_down(Context* on_finish) { -+ m_image_ctx->op_work_queue->queue(on_finish, 0); -+ } -+ -+ bool read( -+ const std::string &oid, uint64_t object_no, uint64_t object_off, -+ uint64_t object_len, librados::snap_t snap_id, int op_flags, -+ const ZTracer::Trace &parent_trace, ceph::bufferlist* read_data, -+ io::ExtentMap* extent_map, int* object_dispatch_flags, -+ io::DispatchResult* dispatch_result, Context** on_finish, -+ Context* on_dispatched) override; -+ -+ bool discard( -+ const std::string &oid, uint64_t object_no, uint64_t object_off, -+ uint64_t object_len, const ::SnapContext &snapc, int discard_flags, -+ const ZTracer::Trace &parent_trace, int* object_dispatch_flags, -+ uint64_t* journal_tid, io::DispatchResult* dispatch_result, -+ Context** on_finish, Context* on_dispatched) { -+ return false; -+ } -+ -+ bool write( -+ const std::string &oid, uint64_t object_no, uint64_t object_off, -+ ceph::bufferlist&& data, const ::SnapContext &snapc, int op_flags, -+ const ZTracer::Trace &parent_trace, int* object_dispatch_flags, -+ uint64_t* journal_tid, io::DispatchResult* dispatch_result, -+ Context** on_finish, Context* on_dispatched) { -+ return false; -+ } -+ -+ bool write_same( -+ const std::string &oid, uint64_t object_no, uint64_t object_off, -+ uint64_t object_len, io::Extents&& buffer_extents, -+ ceph::bufferlist&& data, const ::SnapContext &snapc, int op_flags, -+ const ZTracer::Trace &parent_trace, int* object_dispatch_flags, -+ uint64_t* journal_tid, io::DispatchResult* dispatch_result, -+ Context** on_finish, Context* on_dispatched) { -+ return false; -+ } -+ -+ bool compare_and_write( -+ const std::string &oid, uint64_t object_no, uint64_t object_off, -+ ceph::bufferlist&& cmp_data, ceph::bufferlist&& write_data, -+ const ::SnapContext &snapc, int op_flags, -+ const ZTracer::Trace &parent_trace, uint64_t* mismatch_offset, -+ int* object_dispatch_flags, uint64_t* journal_tid, -+ io::DispatchResult* dispatch_result, Context** on_finish, -+ Context* on_dispatched) { -+ return false; -+ } -+ -+ bool flush( -+ io::FlushSource flush_source, const ZTracer::Trace &parent_trace, -+ io::DispatchResult* dispatch_result, Context** on_finish, -+ Context* on_dispatched) { -+ return false; -+ } -+ -+ bool invalidate_cache(Context* on_finish) { -+ return false; -+ } -+ -+ bool reset_existence_cache(Context* on_finish) { -+ return false; -+ } -+ -+ void extent_overwritten( -+ uint64_t object_no, uint64_t object_off, uint64_t object_len, -+ uint64_t journal_tid, uint64_t new_journal_tid) { -+ } -+ -+private: -+ -+ int handle_read_cache( -+ bool cache, -+ const std::string &oid, uint64_t object_off, -+ uint64_t object_len, ceph::bufferlist* read_data, -+ io::DispatchResult* dispatch_result, -+ Context* on_dispatched); -+ void client_handle_request(std::string msg); -+ -+ ImageCtxT* m_image_ctx; -+ -+ ceph::immutable_obj_cache::CacheClient *m_cache_client = nullptr; -+ SharedPersistentObjectCacher<ImageCtxT> *m_object_store = nullptr; -+}; -+ -+} // namespace cache -+} // namespace librbd -+ -+extern template class librbd::cache::SharedReadOnlyObjectDispatch<librbd::ImageCtx>; -+ -+#endif // CEPH_LIBRBD_CACHE_OBJECT_CACHER_OBJECT_DISPATCH_H -diff --git a/src/librbd/image/OpenRequest.cc b/src/librbd/image/OpenRequest.cc -index 30a7b66..57ce92f 100644 ---- a/src/librbd/image/OpenRequest.cc -+++ b/src/librbd/image/OpenRequest.cc -@@ -8,7 +8,7 @@ - #include "librbd/ImageCtx.h" - #include "librbd/Utils.h" - #include "librbd/cache/ObjectCacherObjectDispatch.h" --#include "librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc" -+#include "librbd/cache/SharedReadOnlyObjectDispatch.cc" - #include "librbd/image/CloseRequest.h" - #include "librbd/image/RefreshRequest.h" - #include "librbd/image/SetSnapRequest.h" -@@ -457,7 +457,7 @@ Context *OpenRequest<I>::send_init_cache(int *result) { - // enable Shared Read-only cache for parent image - if (m_image_ctx->child != nullptr && m_image_ctx->shared_cache_enabled ) { - ldout(cct, 10) << this << " " << "setting up parent cache"<< dendl; -- auto sro_cache = cache::SharedPersistentObjectCacherObjectDispatch<I>::create(m_image_ctx); -+ auto sro_cache = cache::SharedReadOnlyObjectDispatch<I>::create(m_image_ctx); - sro_cache->init(); - } - -diff --git a/src/tools/CMakeLists.txt b/src/tools/CMakeLists.txt -index 72ab342..f7c5872 100644 ---- a/src/tools/CMakeLists.txt -+++ b/src/tools/CMakeLists.txt -@@ -99,7 +99,6 @@ endif(WITH_CEPHFS) - if(WITH_RBD) - add_subdirectory(rbd) - add_subdirectory(rbd_mirror) -- add_subdirectory(rbd_cache) - if(LINUX) - add_subdirectory(rbd_nbd) - endif() -@@ -108,4 +107,5 @@ if(WITH_RBD) - endif() - endif(WITH_RBD) - -+add_subdirectory(ceph_immutable_object_cache) - add_subdirectory(ceph-dencoder) -diff --git a/src/tools/ceph_immutable_object_cache/CMakeLists.txt b/src/tools/ceph_immutable_object_cache/CMakeLists.txt -new file mode 100644 -index 0000000..c7c7af3 ---- /dev/null -+++ b/src/tools/ceph_immutable_object_cache/CMakeLists.txt -@@ -0,0 +1,11 @@ -+add_executable(ceph-immutable-object-cache -+ ${CMAKE_SOURCE_DIR}/src/librbd/cache/SharedPersistentObjectCacherFile.cc -+ ObjectCacheStore.cc -+ CacheController.cc -+ CacheServer.cc -+ CacheSession.cc -+ main.cc) -+target_link_libraries(ceph-immutable-object-cache -+ librados -+ global) -+install(TARGETS ceph-immutable-object-cache DESTINATION bin) -diff --git a/src/tools/ceph_immutable_object_cache/CacheClient.cc b/src/tools/ceph_immutable_object_cache/CacheClient.cc -new file mode 100644 -index 0000000..a7116bf ---- /dev/null -+++ b/src/tools/ceph_immutable_object_cache/CacheClient.cc -@@ -0,0 +1,205 @@ -+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -+// vim: ts=8 sw=2 smarttab -+ -+#include "CacheClient.h" -+ -+#define dout_context g_ceph_context -+#define dout_subsys ceph_subsys_immutable_obj_cache -+#undef dout_prefix -+#define dout_prefix *_dout << "ceph::cache::CacheControllerSocketClient: " << this << " " \ -+ << __func__ << ": " -+ -+ -+using boost::asio::local::stream_protocol; -+ -+namespace ceph { -+namespace immutable_obj_cache { -+ -+ CacheClient::CacheClient(const std::string& file, ClientProcessMsg processmsg, CephContext* ceph_ctx) -+ : m_io_service_work(m_io_service), -+ m_dm_socket(m_io_service), -+ m_client_process_msg(processmsg), -+ m_ep(stream_protocol::endpoint(file)), -+ m_session_work(false), -+ cct(ceph_ctx) -+ { -+ // TODO wrapper io_service -+ std::thread thd([this](){m_io_service.run();}); -+ thd.detach(); -+ } -+ -+ void CacheClient::run(){ -+ } -+ -+ bool CacheClient::is_session_work() { -+ return m_session_work.load() == true; -+ } -+ -+ // just when error occur, call this method. -+ void CacheClient::close() { -+ m_session_work.store(false); -+ boost::system::error_code close_ec; -+ m_dm_socket.close(close_ec); -+ if(close_ec) { -+ ldout(cct, 20) << "close: " << close_ec.message() << dendl; -+ } -+ ldout(cct, 20) << "session don't work, later all request will be dispatched to rados layer" << dendl; -+ } -+ -+ int CacheClient::connect() { -+ boost::system::error_code ec; -+ m_dm_socket.connect(m_ep, ec); -+ if(ec) { -+ if(ec == boost::asio::error::connection_refused) { -+ ldout(cct, 20) << ec.message() << " : maybe rbd-cache Controller don't startup. " -+ << "Now data will be read from ceph cluster " << dendl; -+ } else { -+ ldout(cct, 20) << "connect: " << ec.message() << dendl; -+ } -+ -+ if(m_dm_socket.is_open()) { -+ // Set to indicate what error occurred, if any. -+ // Note that, even if the function indicates an error, -+ // the underlying descriptor is closed. -+ boost::system::error_code close_ec; -+ m_dm_socket.close(close_ec); -+ if(close_ec) { -+ ldout(cct, 20) << "close: " << close_ec.message() << dendl; -+ } -+ } -+ return -1; -+ } -+ -+ ldout(cct, 20) <<"connect success"<< dendl; -+ -+ return 0; -+ } -+ -+ int CacheClient::register_volume(std::string pool_name, std::string vol_name, uint64_t vol_size) { -+ // cache controller will init layout -+ rbdsc_req_type_t *message = new rbdsc_req_type_t(); -+ message->type = RBDSC_REGISTER; -+ memcpy(message->pool_name, pool_name.c_str(), pool_name.size()); -+ memcpy(message->vol_name, vol_name.c_str(), vol_name.size()); -+ message->vol_size = vol_size; -+ message->offset = 0; -+ message->length = 0; -+ -+ uint64_t ret; -+ boost::system::error_code ec; -+ -+ ret = boost::asio::write(m_dm_socket, boost::asio::buffer((char*)message, message->size()), ec); -+ if(ec) { -+ ldout(cct, 20) << "write fails : " << ec.message() << dendl; -+ return -1; -+ } -+ -+ if(ret != message->size()) { -+ ldout(cct, 20) << "write fails : ret != send_bytes " << dendl; -+ return -1; -+ } -+ -+ // hard code TODO -+ ret = boost::asio::read(m_dm_socket, boost::asio::buffer(m_recv_buffer, RBDSC_MSG_LEN), ec); -+ if(ec == boost::asio::error::eof) { -+ ldout(cct, 20) << "recv eof" << dendl; -+ return -1; -+ } -+ -+ if(ec) { -+ ldout(cct, 20) << "write fails : " << ec.message() << dendl; -+ return -1; -+ } -+ -+ if(ret != RBDSC_MSG_LEN) { -+ ldout(cct, 20) << "write fails : ret != receive bytes " << dendl; -+ return -1; -+ } -+ -+ m_client_process_msg(std::string(m_recv_buffer, ret)); -+ -+ delete message; -+ -+ ldout(cct, 20) << "register volume success" << dendl; -+ -+ // TODO -+ m_session_work.store(true); -+ -+ return 0; -+ } -+ -+ // if occur any error, we just return false. Then read from rados. -+ int CacheClient::lookup_object(std::string pool_name, std::string vol_name, std::string object_id, Context* on_finish) { -+ rbdsc_req_type_t *message = new rbdsc_req_type_t(); -+ message->type = RBDSC_READ; -+ memcpy(message->pool_name, pool_name.c_str(), pool_name.size()); -+ memcpy(message->vol_name, object_id.c_str(), object_id.size()); -+ message->vol_size = 0; -+ message->offset = 0; -+ message->length = 0; -+ -+ boost::asio::async_write(m_dm_socket, -+ boost::asio::buffer((char*)message, message->size()), -+ boost::asio::transfer_exactly(RBDSC_MSG_LEN), -+ [this, on_finish, message](const boost::system::error_code& err, size_t cb) { -+ delete message; -+ if(err) { -+ ldout(cct, 20) << "lookup_object: async_write fails." << err.message() << dendl; -+ close(); -+ on_finish->complete(false); -+ return; -+ } -+ if(cb != RBDSC_MSG_LEN) { -+ ldout(cct, 20) << "lookup_object: async_write fails. in-complete request" << dendl; -+ close(); -+ on_finish->complete(false); -+ return; -+ } -+ get_result(on_finish); -+ }); -+ -+ return 0; -+ } -+ -+ void CacheClient::get_result(Context* on_finish) { -+ boost::asio::async_read(m_dm_socket, boost::asio::buffer(m_recv_buffer, RBDSC_MSG_LEN), -+ boost::asio::transfer_exactly(RBDSC_MSG_LEN), -+ [this, on_finish](const boost::system::error_code& err, size_t cb) { -+ if(err == boost::asio::error::eof) { -+ ldout(cct, 20) << "get_result: ack is EOF." << dendl; -+ close(); -+ on_finish->complete(false); -+ return; -+ } -+ if(err) { -+ ldout(cct, 20) << "get_result: async_read fails:" << err.message() << dendl; -+ close(); -+ on_finish->complete(false); // TODO replace this assert with some metohds. -+ return; -+ } -+ if (cb != RBDSC_MSG_LEN) { -+ close(); -+ ldout(cct, 20) << "get_result: in-complete ack." << dendl; -+ on_finish->complete(false); // TODO: replace this assert with some methods. -+ } -+ -+ rbdsc_req_type_t *io_ctx = (rbdsc_req_type_t*)(m_recv_buffer); -+ -+ // TODO: re-occur yuan's bug -+ if(io_ctx->type == RBDSC_READ) { -+ ldout(cct, 20) << "get rbdsc_read... " << dendl; -+ assert(0); -+ } -+ -+ if (io_ctx->type == RBDSC_READ_REPLY) { -+ on_finish->complete(true); -+ return; -+ } else { -+ on_finish->complete(false); -+ return; -+ } -+ }); -+ } -+ -+} // namespace immutable_obj_cache -+} // namespace ceph -diff --git a/src/tools/ceph_immutable_object_cache/CacheClient.h b/src/tools/ceph_immutable_object_cache/CacheClient.h -new file mode 100644 -index 0000000..d82ab8f ---- /dev/null -+++ b/src/tools/ceph_immutable_object_cache/CacheClient.h -@@ -0,0 +1,53 @@ -+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -+// vim: ts=8 sw=2 smarttab -+ -+#ifndef CEPH_CACHE_CLIENT_H -+#define CEPH_CACHE_CLIENT_H -+ -+#include <atomic> -+#include <boost/asio.hpp> -+#include <boost/bind.hpp> -+#include <boost/asio/error.hpp> -+#include <boost/algorithm/string.hpp> -+#include "librbd/ImageCtx.h" -+#include "include/assert.h" -+#include "include/Context.h" -+#include "SocketCommon.h" -+ -+ -+using boost::asio::local::stream_protocol; -+ -+namespace ceph { -+namespace immutable_obj_cache { -+ -+class CacheClient { -+public: -+ CacheClient(const std::string& file, ClientProcessMsg processmsg, CephContext* ceph_ctx); -+ void run(); -+ bool is_session_work(); -+ -+ void close(); -+ int connect(); -+ -+ int register_volume(std::string pool_name, std::string vol_name, uint64_t vol_size); -+ int lookup_object(std::string pool_name, std::string vol_name, std::string object_id, Context* on_finish); -+ void get_result(Context* on_finish); -+ -+private: -+ boost::asio::io_service m_io_service; -+ boost::asio::io_service::work m_io_service_work; -+ stream_protocol::socket m_dm_socket; -+ ClientProcessMsg m_client_process_msg; -+ stream_protocol::endpoint m_ep; -+ char m_recv_buffer[1024]; -+ -+ // atomic modfiy for this variable. -+ // thread 1 : asio callback thread modify it. -+ // thread 2 : librbd read it. -+ std::atomic<bool> m_session_work; -+ CephContext* cct; -+}; -+ -+} // namespace immutable_obj_cache -+} // namespace ceph -+#endif -diff --git a/src/tools/ceph_immutable_object_cache/CacheController.cc b/src/tools/ceph_immutable_object_cache/CacheController.cc -new file mode 100644 -index 0000000..cb636d2 ---- /dev/null -+++ b/src/tools/ceph_immutable_object_cache/CacheController.cc -@@ -0,0 +1,117 @@ -+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -+// vim: ts=8 sw=2 smarttab -+ -+#include "CacheController.h" -+ -+#define dout_context g_ceph_context -+#define dout_subsys ceph_subsys_immutable_obj_cache -+#undef dout_prefix -+#define dout_prefix *_dout << "ceph::cache::CacheController: " << this << " " \ -+ << __func__ << ": " -+ -+namespace ceph { -+namespace immutable_obj_cache { -+ -+class ThreadPoolSingleton : public ThreadPool { -+public: -+ ContextWQ *op_work_queue; -+ -+ explicit ThreadPoolSingleton(CephContext *cct) -+ : ThreadPool(cct, "ceph::cache::thread_pool", "tp_librbd_cache", 32, -+ "pcache_threads"), -+ op_work_queue(new ContextWQ("ceph::pcache_op_work_queue", -+ cct->_conf.get_val<int64_t>("rbd_op_thread_timeout"), -+ this)) { -+ start(); -+ } -+ ~ThreadPoolSingleton() override { -+ op_work_queue->drain(); -+ delete op_work_queue; -+ -+ stop(); -+ } -+}; -+ -+ -+CacheController::CacheController(CephContext *cct, const std::vector<const char*> &args): -+ m_args(args), m_cct(cct) { -+ -+} -+ -+CacheController::~CacheController() { -+ -+} -+ -+int CacheController::init() { -+ ThreadPoolSingleton* thread_pool_singleton = &m_cct->lookup_or_create_singleton_object<ThreadPoolSingleton>( -+ "ceph::cache::thread_pool", false, m_cct); -+ pcache_op_work_queue = thread_pool_singleton->op_work_queue; -+ -+ m_object_cache_store = new ObjectCacheStore(m_cct, pcache_op_work_queue); -+ //TODO(): make this configurable -+ int r = m_object_cache_store->init(true); -+ if (r < 0) { -+ lderr(m_cct) << "init error\n" << dendl; -+ } -+ return r; -+} -+ -+int CacheController::shutdown() { -+ int r = m_object_cache_store->shutdown(); -+ return r; -+} -+ -+void CacheController::handle_signal(int signum){} -+ -+void CacheController::run() { -+ try { -+ //TODO(): use new socket path -+ std::string controller_path = m_cct->_conf.get_val<std::string>("rbd_shared_cache_sock"); -+ std::remove(controller_path.c_str()); -+ -+ m_cache_server = new CacheServer(controller_path, -+ ([&](uint64_t p, std::string s){handle_request(p, s);}), m_cct); -+ m_cache_server->run(); -+ } catch (std::exception& e) { -+ lderr(m_cct) << "Exception: " << e.what() << dendl; -+ } -+} -+ -+void CacheController::handle_request(uint64_t session_id, std::string msg){ -+ rbdsc_req_type_t *io_ctx = (rbdsc_req_type_t*)(msg.c_str()); -+ -+ int ret = 0; -+ -+ switch (io_ctx->type) { -+ case RBDSC_REGISTER: { -+ // init cache layout for volume -+ m_object_cache_store->init_cache(io_ctx->vol_name, io_ctx->vol_size); -+ io_ctx->type = RBDSC_REGISTER_REPLY; -+ m_cache_server->send(session_id, std::string((char*)io_ctx, msg.size())); -+ -+ break; -+ } -+ case RBDSC_READ: { -+ // lookup object in local cache store -+ ret = m_object_cache_store->lookup_object(io_ctx->pool_name, io_ctx->vol_name); -+ if (ret < 0) { -+ io_ctx->type = RBDSC_READ_RADOS; -+ } else { -+ io_ctx->type = RBDSC_READ_REPLY; -+ } -+ if (io_ctx->type != RBDSC_READ_REPLY) { -+ assert(0); -+ } -+ m_cache_server->send(session_id, std::string((char*)io_ctx, msg.size())); -+ -+ break; -+ } -+ ldout(m_cct, 5) << "can't recongize request" << dendl; -+ assert(0); // TODO replace it. -+ } -+} -+ -+} // namespace immutable_obj_cache -+} // namespace ceph -+ -+ -diff --git a/src/tools/ceph_immutable_object_cache/CacheController.h b/src/tools/ceph_immutable_object_cache/CacheController.h -new file mode 100644 -index 0000000..837fe36 ---- /dev/null -+++ b/src/tools/ceph_immutable_object_cache/CacheController.h -@@ -0,0 +1,53 @@ -+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -+// vim: ts=8 sw=2 smarttab -+ -+#ifndef CEPH_CACHE_CONTROLLER_H -+#define CEPH_CACHE_CONTROLLER_H -+ -+#include "common/Formatter.h" -+#include "common/admin_socket.h" -+#include "common/debug.h" -+#include "common/errno.h" -+#include "common/ceph_context.h" -+#include "common/Mutex.h" -+#include "common/WorkQueue.h" -+#include "include/rados/librados.hpp" -+#include "include/rbd/librbd.h" -+#include "include/assert.h" -+#include "librbd/ImageCtx.h" -+#include "librbd/ImageState.h" -+#include "CacheServer.h" -+#include "ObjectCacheStore.h" -+ -+#include <thread> -+ -+namespace ceph { -+namespace immutable_obj_cache { -+ -+class CacheController { -+ public: -+ CacheController(CephContext *cct, const std::vector<const char*> &args); -+ ~CacheController(); -+ -+ int init(); -+ -+ int shutdown(); -+ -+ void handle_signal(int sinnum); -+ -+ void run(); -+ -+ void handle_request(uint64_t sesstion_id, std::string msg); -+ -+ private: -+ CacheServer *m_cache_server; -+ std::vector<const char*> m_args; -+ CephContext *m_cct; -+ ObjectCacheStore *m_object_cache_store; -+ ContextWQ* pcache_op_work_queue; -+}; -+ -+} // namespace immutable_obj_cache -+} // namespace ceph -+ -+#endif -diff --git a/src/tools/ceph_immutable_object_cache/CacheServer.cc b/src/tools/ceph_immutable_object_cache/CacheServer.cc -new file mode 100644 -index 0000000..dd2d47e ---- /dev/null -+++ b/src/tools/ceph_immutable_object_cache/CacheServer.cc -@@ -0,0 +1,99 @@ -+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -+// vim: ts=8 sw=2 smarttab -+ -+#include "common/debug.h" -+#include "common/ceph_context.h" -+#include "CacheServer.h" -+ -+#define dout_context g_ceph_context -+#define dout_subsys ceph_subsys_immutable_obj_cache -+#undef dout_prefix -+#define dout_prefix *_dout << "ceph::cache::CacheControllerSocket: " << this << " " \ -+ << __func__ << ": " -+ -+ -+using boost::asio::local::stream_protocol; -+ -+namespace ceph { -+namespace immutable_obj_cache { -+ -+CacheServer::CacheServer(const std::string& file, ProcessMsg processmsg, CephContext* cct) -+ : cct(cct), m_server_process_msg(processmsg), -+ m_local_path(file), m_acceptor(m_io_service) {} -+ -+CacheServer::~CacheServer(){} -+ -+void CacheServer::run() { -+ bool ret; -+ ret = start_accept(); -+ if(!ret) { -+ return; -+ } -+ m_io_service.run(); -+} -+ -+// TODO : use callback to replace this function. -+void CacheServer::send(uint64_t session_id, std::string msg) { -+ auto it = m_session_map.find(session_id); -+ if (it != m_session_map.end()) { -+ it->second->send(msg); -+ } else { -+ // TODO : why don't find existing session id ? -+ ldout(cct, 20) << "don't find session id..." << dendl; -+ assert(0); -+ } -+} -+ -+// when creating one acceptor, can control every step in this way. -+bool CacheServer::start_accept() { -+ boost::system::error_code ec; -+ m_acceptor.open(m_local_path.protocol(), ec); -+ if(ec) { -+ ldout(cct, 20) << "m_acceptor open fails: " << ec.message() << dendl; -+ return false; -+ } -+ -+ // TODO control acceptor attribute. -+ -+ m_acceptor.bind(m_local_path, ec); -+ if(ec) { -+ ldout(cct, 20) << "m_acceptor bind fails: " << ec.message() << dendl; -+ return false; -+ } -+ -+ m_acceptor.listen(boost::asio::socket_base::max_connections, ec); -+ if(ec) { -+ ldout(cct, 20) << "m_acceptor listen fails: " << ec.message() << dendl; -+ return false; -+ } -+ -+ accept(); -+ return true; -+} -+ -+void CacheServer::accept() { -+ CacheSessionPtr new_session(new CacheSession(m_session_id, m_io_service, m_server_process_msg, cct)); -+ m_acceptor.async_accept(new_session->socket(), -+ boost::bind(&CacheServer::handle_accept, this, new_session, -+ boost::asio::placeholders::error)); -+} -+ -+void CacheServer::handle_accept(CacheSessionPtr new_session, const boost::system::error_code& error) { -+ -+ if(error) { -+ lderr(cct) << "async accept fails : " << error.message() << dendl; -+ assert(0); // TODO -+ } -+ -+ m_session_map.emplace(m_session_id, new_session); -+ // TODO : session setting -+ new_session->start(); -+ m_session_id++; -+ -+ // lanuch next accept -+ accept(); -+} -+ -+} // namespace immutable_obj_cache -+} // namespace ceph -+ -diff --git a/src/tools/ceph_immutable_object_cache/CacheServer.h b/src/tools/ceph_immutable_object_cache/CacheServer.h -new file mode 100644 -index 0000000..6c5c133 ---- /dev/null -+++ b/src/tools/ceph_immutable_object_cache/CacheServer.h -@@ -0,0 +1,54 @@ -+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -+// vim: ts=8 sw=2 smarttab -+ -+#ifndef CEPH_CACHE_SERVER_H -+#define CEPH_CACHE_SERVER_H -+ -+#include <cstdio> -+#include <iostream> -+#include <array> -+#include <memory> -+#include <string> -+#include <boost/bind.hpp> -+#include <boost/asio.hpp> -+#include <boost/asio/error.hpp> -+#include <boost/algorithm/string.hpp> -+ -+#include "include/assert.h" -+#include "SocketCommon.h" -+#include "CacheSession.h" -+ -+ -+using boost::asio::local::stream_protocol; -+ -+namespace ceph { -+namespace immutable_obj_cache { -+ -+class CacheServer { -+ -+ public: -+ CacheServer(const std::string& file, ProcessMsg processmsg, CephContext* cct); -+ ~CacheServer(); -+ -+ void run(); -+ void send(uint64_t session_id, std::string msg); -+ -+ private: -+ bool start_accept(); -+ void accept(); -+ void handle_accept(CacheSessionPtr new_session, const boost::system::error_code& error); -+ -+ private: -+ CephContext* cct; -+ boost::asio::io_service m_io_service; // TODO wrapper it. -+ ProcessMsg m_server_process_msg; -+ stream_protocol::endpoint m_local_path; -+ stream_protocol::acceptor m_acceptor; -+ uint64_t m_session_id = 1; -+ std::map<uint64_t, CacheSessionPtr> m_session_map; -+}; -+ -+} // namespace immutable_obj_cache -+} // namespace ceph -+ -+#endif -diff --git a/src/tools/ceph_immutable_object_cache/CacheSession.cc b/src/tools/ceph_immutable_object_cache/CacheSession.cc -new file mode 100644 -index 0000000..6cffb41 ---- /dev/null -+++ b/src/tools/ceph_immutable_object_cache/CacheSession.cc -@@ -0,0 +1,115 @@ -+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -+// vim: ts=8 sw=2 smarttab -+ -+#include "common/debug.h" -+#include "common/ceph_context.h" -+#include "CacheSession.h" -+ -+#define dout_context g_ceph_context -+#define dout_subsys ceph_subsys_immutable_obj_cache -+#undef dout_prefix -+#define dout_prefix *_dout << "ceph::cache::CacheSession: " << this << " " \ -+ << __func__ << ": " -+ -+ -+namespace ceph { -+namespace immutable_obj_cache { -+ -+CacheSession::CacheSession(uint64_t session_id, boost::asio::io_service& io_service, ProcessMsg processmsg, CephContext* cct) -+ : m_session_id(session_id), m_dm_socket(io_service), process_msg(processmsg), cct(cct) -+ {} -+ -+CacheSession::~CacheSession(){} -+ -+stream_protocol::socket& CacheSession::socket() { -+ return m_dm_socket; -+} -+ -+void CacheSession::start() { -+ if(true) { -+ serial_handing_request(); -+ } else { -+ parallel_handing_request(); -+ } -+} -+// flow: -+// -+// recv request --> process request --> reply ack -+// | | -+// --------------<------------------------- -+void CacheSession::serial_handing_request() { -+ boost::asio::async_read(m_dm_socket, boost::asio::buffer(m_buffer, RBDSC_MSG_LEN), -+ boost::asio::transfer_exactly(RBDSC_MSG_LEN), -+ boost::bind(&CacheSession::handle_read, -+ shared_from_this(), -+ boost::asio::placeholders::error, -+ boost::asio::placeholders::bytes_transferred)); -+} -+ -+// flow : -+// -+// --> thread 1: process request -+// recv request --> thread 2: process request --> reply ack -+// --> thread n: process request -+// -+void CacheSession::parallel_handing_request() { -+ // TODO -+} -+ -+void CacheSession::handle_read(const boost::system::error_code& error, size_t bytes_transferred) { -+ // when recv eof, the most proble is that client side close socket. -+ // so, server side need to end handing_request -+ if(error == boost::asio::error::eof) { -+ ldout(cct, 20) << "session: async_read : " << error.message() << dendl; -+ return; -+ } -+ -+ if(error) { -+ ldout(cct, 20) << "session: async_read fails: " << error.message() << dendl; -+ assert(0); -+ } -+ -+ if(bytes_transferred != RBDSC_MSG_LEN) { -+ ldout(cct, 20) << "session : request in-complete. "<<dendl; -+ assert(0); -+ } -+ -+ // TODO async_process can increse coding readable. -+ // process_msg_callback call handle async_send -+ process_msg(m_session_id, std::string(m_buffer, bytes_transferred)); -+} -+ -+void CacheSession::handle_write(const boost::system::error_code& error, size_t bytes_transferred) { -+ if (error) { -+ ldout(cct, 20) << "session: async_write fails: " << error.message() << dendl; -+ assert(0); -+ } -+ -+ if(bytes_transferred != RBDSC_MSG_LEN) { -+ ldout(cct, 20) << "session : reply in-complete. "<<dendl; -+ assert(0); -+ } -+ -+ boost::asio::async_read(m_dm_socket, boost::asio::buffer(m_buffer), -+ boost::asio::transfer_exactly(RBDSC_MSG_LEN), -+ boost::bind(&CacheSession::handle_read, -+ shared_from_this(), -+ boost::asio::placeholders::error, -+ boost::asio::placeholders::bytes_transferred)); -+ -+} -+ -+void CacheSession::send(std::string msg) { -+ boost::asio::async_write(m_dm_socket, -+ boost::asio::buffer(msg.c_str(), msg.size()), -+ boost::asio::transfer_exactly(RBDSC_MSG_LEN), -+ boost::bind(&CacheSession::handle_write, -+ shared_from_this(), -+ boost::asio::placeholders::error, -+ boost::asio::placeholders::bytes_transferred)); -+ -+} -+ -+} // namespace immutable_obj_cache -+} // namespace ceph -+ -diff --git a/src/tools/ceph_immutable_object_cache/CacheSession.h b/src/tools/ceph_immutable_object_cache/CacheSession.h -new file mode 100644 -index 0000000..ce2591b ---- /dev/null -+++ b/src/tools/ceph_immutable_object_cache/CacheSession.h -@@ -0,0 +1,58 @@ -+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -+// vim: ts=8 sw=2 smarttab -+ -+#ifndef CEPH_CACHE_SESSION_H -+#define CEPH_CACHE_SESSION_H -+ -+#include <iostream> -+#include <string> -+#include <boost/bind.hpp> -+#include <boost/asio.hpp> -+#include <boost/asio/error.hpp> -+#include <boost/algorithm/string.hpp> -+ -+#include "include/assert.h" -+#include "SocketCommon.h" -+ -+ -+using boost::asio::local::stream_protocol; -+ -+namespace ceph { -+namespace immutable_obj_cache { -+ -+class CacheSession : public std::enable_shared_from_this<CacheSession> { -+public: -+ CacheSession(uint64_t session_id, boost::asio::io_service& io_service, ProcessMsg processmsg, CephContext* cct); -+ ~CacheSession(); -+ -+ stream_protocol::socket& socket(); -+ void start(); -+ void serial_handing_request(); -+ void parallel_handing_request(); -+ -+private: -+ -+ void handle_read(const boost::system::error_code& error, size_t bytes_transferred); -+ -+ void handle_write(const boost::system::error_code& error, size_t bytes_transferred); -+ -+public: -+ void send(std::string msg); -+ -+private: -+ uint64_t m_session_id; -+ stream_protocol::socket m_dm_socket; -+ ProcessMsg process_msg; -+ CephContext* cct; -+ -+ // Buffer used to store data received from the client. -+ //std::array<char, 1024> data_; -+ char m_buffer[1024]; -+}; -+ -+typedef std::shared_ptr<CacheSession> CacheSessionPtr; -+ -+} // namespace immutable_obj_cache -+} // namespace ceph -+ -+#endif -diff --git a/src/tools/ceph_immutable_object_cache/ObjectCacheStore.cc b/src/tools/ceph_immutable_object_cache/ObjectCacheStore.cc -new file mode 100644 -index 0000000..50721ca ---- /dev/null -+++ b/src/tools/ceph_immutable_object_cache/ObjectCacheStore.cc -@@ -0,0 +1,172 @@ -+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -+// vim: ts=8 sw=2 smarttab -+ -+#include "ObjectCacheStore.h" -+ -+#define dout_context g_ceph_context -+#define dout_subsys ceph_subsys_immutable_obj_cache -+#undef dout_prefix -+#define dout_prefix *_dout << "ceph::cache::ObjectCacheStore: " << this << " " \ -+ << __func__ << ": " -+ -+namespace ceph { -+namespace immutable_obj_cache { -+ -+ObjectCacheStore::ObjectCacheStore(CephContext *cct, ContextWQ* work_queue) -+ : m_cct(cct), m_work_queue(work_queue), -+ m_rados(new librados::Rados()) { -+ -+ uint64_t object_cache_entries = -+ cct->_conf.get_val<int64_t>("rbd_shared_cache_entries"); -+ -+ //TODO(): allow to set level -+ m_policy = new SimplePolicy(object_cache_entries, 0.5); -+} -+ -+ObjectCacheStore::~ObjectCacheStore() { -+ delete m_policy; -+} -+ -+int ObjectCacheStore::init(bool reset) { -+ -+ int ret = m_rados->init_with_context(m_cct); -+ if(ret < 0) { -+ lderr(m_cct) << "fail to init Ceph context" << dendl; -+ return ret; -+ } -+ -+ ret = m_rados->connect(); -+ if(ret < 0 ) { -+ lderr(m_cct) << "fail to conect to cluster" << dendl; -+ return ret; -+ } -+ -+ std::string cache_path = m_cct->_conf.get_val<std::string>("rbd_shared_cache_path"); -+ //TODO(): check and reuse existing cache objects -+ if(reset) { -+ std::string cmd = "exec rm -rf " + cache_path + "/rbd_cache*; exec mkdir -p " + cache_path; -+ //TODO(): to use std::filesystem -+ int r = system(cmd.c_str()); -+ } -+ -+ evict_thd = new std::thread([this]{this->evict_thread_body();}); -+ return ret; -+} -+ -+int ObjectCacheStore::do_promote(std::string pool_name, std::string object_name) { -+ int ret = 0; -+ std::string cache_file_name = pool_name + object_name; -+ -+ //TODO(): lock on ioctx map -+ if (m_ioctxs.find(pool_name) == m_ioctxs.end()) { -+ librados::IoCtx* io_ctx = new librados::IoCtx(); -+ ret = m_rados->ioctx_create(pool_name.c_str(), *io_ctx); -+ if (ret < 0) { -+ lderr(m_cct) << "fail to create ioctx" << dendl; -+ assert(0); -+ } -+ m_ioctxs.emplace(pool_name, io_ctx); -+ } -+ -+ assert(m_ioctxs.find(pool_name) != m_ioctxs.end()); -+ -+ librados::IoCtx* ioctx = m_ioctxs[pool_name]; -+ -+ librados::bufferlist* read_buf = new librados::bufferlist(); -+ int object_size = 4096*1024; //TODO(): read config from image metadata -+ -+ //TODO(): async promote -+ ret = promote_object(ioctx, object_name, read_buf, object_size); -+ if (ret == -ENOENT) { -+ read_buf->append(std::string(object_size, '0')); -+ ret = 0; -+ } -+ -+ if( ret < 0) { -+ lderr(m_cct) << "fail to read from rados" << dendl; -+ return ret; -+ } -+ -+ // persistent to cache -+ librbd::cache::SyncFile cache_file(m_cct, cache_file_name); -+ cache_file.open(); -+ ret = cache_file.write_object_to_file(*read_buf, object_size); -+ -+ // update metadata -+ assert(OBJ_CACHE_PROMOTING == m_policy->get_status(cache_file_name)); -+ m_policy->update_status(cache_file_name, OBJ_CACHE_PROMOTED); -+ assert(OBJ_CACHE_PROMOTED == m_policy->get_status(cache_file_name)); -+ -+ return ret; -+ -+} -+ -+// return -1, client need to read data from cluster. -+// return 0, client directly read data from cache. -+int ObjectCacheStore::lookup_object(std::string pool_name, std::string object_name) { -+ -+ std::string cache_file_name = pool_name + object_name; -+ -+ CACHESTATUS ret; -+ ret = m_policy->lookup_object(cache_file_name); -+ -+ switch(ret) { -+ case OBJ_CACHE_NONE: -+ return do_promote(pool_name, object_name); -+ case OBJ_CACHE_PROMOTED: -+ return 0; -+ case OBJ_CACHE_PROMOTING: -+ default: -+ return -1; -+ } -+} -+ -+void ObjectCacheStore::evict_thread_body() { -+ int ret; -+ while(m_evict_go) { -+ ret = evict_objects(); -+ } -+} -+ -+ -+int ObjectCacheStore::shutdown() { -+ m_evict_go = false; -+ evict_thd->join(); -+ m_rados->shutdown(); -+ return 0; -+} -+ -+int ObjectCacheStore::init_cache(std::string vol_name, uint64_t vol_size) { -+ return 0; -+} -+ -+int ObjectCacheStore::lock_cache(std::string vol_name) { -+ return 0; -+} -+ -+int ObjectCacheStore::promote_object(librados::IoCtx* ioctx, std::string object_name, librados::bufferlist* read_buf, uint64_t read_len) { -+ int ret; -+ -+ librados::AioCompletion* read_completion = librados::Rados::aio_create_completion(); -+ -+ ret = ioctx->aio_read(object_name, read_completion, read_buf, read_len, 0); -+ if(ret < 0) { -+ lderr(m_cct) << "fail to read from rados" << dendl; -+ return ret; -+ } -+ read_completion->wait_for_complete(); -+ ret = read_completion->get_return_value(); -+ return ret; -+ -+} -+ -+int ObjectCacheStore::evict_objects() { -+ std::list<std::string> obj_list; -+ m_policy->get_evict_list(&obj_list); -+ for (auto& obj: obj_list) { -+ //do_evict(obj); -+ } -+} -+ -+} // namespace immutable_obj_cache -+} // namespace ceph -diff --git a/src/tools/ceph_immutable_object_cache/ObjectCacheStore.h b/src/tools/ceph_immutable_object_cache/ObjectCacheStore.h -new file mode 100644 -index 0000000..d044b27 ---- /dev/null -+++ b/src/tools/ceph_immutable_object_cache/ObjectCacheStore.h -@@ -0,0 +1,70 @@ -+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -+// vim: ts=8 sw=2 smarttab -+ -+#ifndef CEPH_CACHE_OBJECT_CACHE_STORE_H -+#define CEPH_CACHE_OBJECT_CACHE_STORE_H -+ -+#include "common/debug.h" -+#include "common/errno.h" -+#include "common/ceph_context.h" -+#include "common/Mutex.h" -+#include "include/rados/librados.hpp" -+#include "include/rbd/librbd.h" -+#include "librbd/ImageCtx.h" -+#include "librbd/ImageState.h" -+#include "librbd/cache/SharedPersistentObjectCacherFile.h" -+#include "SimplePolicy.hpp" -+ -+ -+using librados::Rados; -+using librados::IoCtx; -+ -+namespace ceph { -+namespace immutable_obj_cache { -+ -+typedef shared_ptr<librados::Rados> RadosRef; -+typedef shared_ptr<librados::IoCtx> IoCtxRef; -+ -+class ObjectCacheStore -+{ -+ public: -+ ObjectCacheStore(CephContext *cct, ContextWQ* work_queue); -+ ~ObjectCacheStore(); -+ -+ int init(bool reset); -+ -+ int shutdown(); -+ -+ int lookup_object(std::string pool_name, std::string object_name); -+ -+ int init_cache(std::string vol_name, uint64_t vol_size); -+ -+ int lock_cache(std::string vol_name); -+ -+ private: -+ void evict_thread_body(); -+ int evict_objects(); -+ -+ int do_promote(std::string pool_name, std::string object_name); -+ -+ int promote_object(librados::IoCtx*, std::string object_name, -+ librados::bufferlist* read_buf, -+ uint64_t length); -+ -+ CephContext *m_cct; -+ ContextWQ* m_work_queue; -+ RadosRef m_rados; -+ -+ -+ std::map<std::string, librados::IoCtx*> m_ioctxs; -+ -+ librbd::cache::SyncFile *m_cache_file; -+ -+ Policy* m_policy; -+ std::thread* evict_thd; -+ bool m_evict_go = false; -+}; -+ -+} // namespace ceph -+} // namespace immutable_obj_cache -+#endif -diff --git a/src/tools/ceph_immutable_object_cache/Policy.hpp b/src/tools/ceph_immutable_object_cache/Policy.hpp -new file mode 100644 -index 0000000..8090202 ---- /dev/null -+++ b/src/tools/ceph_immutable_object_cache/Policy.hpp -@@ -0,0 +1,33 @@ -+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -+// vim: ts=8 sw=2 smarttab -+ -+#ifndef CEPH_CACHE_POLICY_HPP -+#define CEPH_CACHE_POLICY_HPP -+ -+#include <list> -+#include <string> -+ -+namespace ceph { -+namespace immutable_obj_cache { -+ -+enum CACHESTATUS { -+ OBJ_CACHE_NONE = 0, -+ OBJ_CACHE_PROMOTING, -+ OBJ_CACHE_PROMOTED, -+}; -+ -+ -+class Policy { -+public: -+ Policy(){} -+ virtual ~Policy(){}; -+ virtual CACHESTATUS lookup_object(std::string) = 0; -+ virtual int evict_object(std::string&) = 0; -+ virtual void update_status(std::string, CACHESTATUS) = 0; -+ virtual CACHESTATUS get_status(std::string) = 0; -+ virtual void get_evict_list(std::list<std::string>* obj_list) = 0; -+}; -+ -+} // namespace immutable_obj_cache -+} // namespace ceph -+#endif -diff --git a/src/tools/ceph_immutable_object_cache/SimplePolicy.hpp b/src/tools/ceph_immutable_object_cache/SimplePolicy.hpp -new file mode 100644 -index 0000000..757ee6a ---- /dev/null -+++ b/src/tools/ceph_immutable_object_cache/SimplePolicy.hpp -@@ -0,0 +1,163 @@ -+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -+// vim: ts=8 sw=2 smarttab -+ -+#ifndef CEPH_CACHE_SIMPLE_POLICY_HPP -+#define CEPH_CACHE_SIMPLE_POLICY_HPP -+ -+#include "Policy.hpp" -+#include "include/lru.h" -+#include "common/RWLock.h" -+#include "common/Mutex.h" -+ -+#include <vector> -+#include <unordered_map> -+#include <string> -+ -+namespace ceph { -+namespace immutable_obj_cache { -+ -+ -+class SimplePolicy : public Policy { -+public: -+ SimplePolicy(uint64_t block_num, float watermark) -+ : m_watermark(watermark), m_entry_count(block_num), -+ m_cache_map_lock("rbd::cache::SimplePolicy::m_cache_map_lock"), -+ m_free_list_lock("rbd::cache::SimplePolicy::m_free_list_lock") -+ { -+ -+ for(uint64_t i = 0; i < m_entry_count; i++) { -+ m_free_list.push_back(new Entry()); -+ } -+ -+ } -+ -+ ~SimplePolicy() { -+ for(uint64_t i = 0; i < m_entry_count; i++) { -+ Entry* entry = reinterpret_cast<Entry*>(m_free_list.front()); -+ delete entry; -+ m_free_list.pop_front(); -+ } -+ } -+ -+ CACHESTATUS lookup_object(std::string cache_file_name) { -+ -+ //TODO(): check race condition -+ RWLock::WLocker wlocker(m_cache_map_lock); -+ -+ auto entry_it = m_cache_map.find(cache_file_name); -+ if(entry_it == m_cache_map.end()) { -+ Mutex::Locker locker(m_free_list_lock); -+ Entry* entry = reinterpret_cast<Entry*>(m_free_list.front()); -+ assert(entry != nullptr); -+ m_free_list.pop_front(); -+ entry->status = OBJ_CACHE_PROMOTING; -+ -+ m_cache_map[cache_file_name] = entry; -+ -+ return OBJ_CACHE_NONE; -+ } -+ -+ Entry* entry = entry_it->second; -+ -+ if(entry->status == OBJ_CACHE_PROMOTED) { -+ // touch it -+ m_promoted_lru.lru_touch(entry); -+ } -+ -+ return entry->status; -+ } -+ -+ int evict_object(std::string& out_cache_file_name) { -+ RWLock::WLocker locker(m_cache_map_lock); -+ -+ return 1; -+ } -+ -+ // TODO(): simplify the logic -+ void update_status(std::string file_name, CACHESTATUS new_status) { -+ RWLock::WLocker locker(m_cache_map_lock); -+ -+ Entry* entry; -+ auto entry_it = m_cache_map.find(file_name); -+ -+ // just check. -+ if(new_status == OBJ_CACHE_PROMOTING) { -+ assert(entry_it == m_cache_map.end()); -+ } -+ -+ assert(entry_it != m_cache_map.end()); -+ -+ entry = entry_it->second; -+ -+ // promoting is done, so update it. -+ if(entry->status == OBJ_CACHE_PROMOTING && new_status== OBJ_CACHE_PROMOTED) { -+ m_promoted_lru.lru_insert_top(entry); -+ entry->status = new_status; -+ return; -+ } -+ -+ assert(0); -+ } -+ -+ // get entry status -+ CACHESTATUS get_status(std::string file_name) { -+ RWLock::RLocker locker(m_cache_map_lock); -+ auto entry_it = m_cache_map.find(file_name); -+ if(entry_it == m_cache_map.end()) { -+ return OBJ_CACHE_NONE; -+ } -+ -+ return entry_it->second->status; -+ } -+ -+ void get_evict_list(std::list<std::string>* obj_list) { -+ RWLock::WLocker locker(m_cache_map_lock); -+ // check free ratio, pop entries from LRU -+ if (m_free_list.size() / m_entry_count < m_watermark) { -+ int evict_num = 10; //TODO(): make this configurable -+ for(int i = 0; i < evict_num; i++) { -+ Entry* entry = reinterpret_cast<Entry*>(m_promoted_lru.lru_expire()); -+ if (entry == nullptr) { -+ continue; -+ } -+ std::string file_name = entry->cache_file_name; -+ obj_list->push_back(file_name); -+ -+ auto entry_it = m_cache_map.find(file_name); -+ m_cache_map.erase(entry_it); -+ -+ //mark this entry as free -+ entry->status = OBJ_CACHE_NONE; -+ Mutex::Locker locker(m_free_list_lock); -+ m_free_list.push_back(entry); -+ } -+ } -+ } -+ -+private: -+ -+ class Entry : public LRUObject { -+ public: -+ CACHESTATUS status; -+ Entry() : status(OBJ_CACHE_NONE){} -+ std::string cache_file_name; -+ void encode(bufferlist &bl){} -+ void decode(bufferlist::iterator &it){} -+ }; -+ -+ float m_watermark; -+ uint64_t m_entry_count; -+ -+ std::unordered_map<std::string, Entry*> m_cache_map; -+ RWLock m_cache_map_lock; -+ -+ std::deque<Entry*> m_free_list; -+ Mutex m_free_list_lock; -+ -+ LRU m_promoted_lru; // include promoted, using status. -+ -+}; -+ -+} // namespace immutable_obj_cache -+} // namespace ceph -+#endif -diff --git a/src/tools/ceph_immutable_object_cache/SocketCommon.h b/src/tools/ceph_immutable_object_cache/SocketCommon.h -new file mode 100644 -index 0000000..53dca54 ---- /dev/null -+++ b/src/tools/ceph_immutable_object_cache/SocketCommon.h -@@ -0,0 +1,54 @@ -+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -+// vim: ts=8 sw=2 smarttab -+ -+#ifndef CEPH_CACHE_SOCKET_COMMON_H -+#define CEPH_CACHE_SOCKET_COMMON_H -+ -+namespace ceph { -+namespace immutable_obj_cache { -+ -+static const int RBDSC_REGISTER = 0X11; -+static const int RBDSC_READ = 0X12; -+static const int RBDSC_LOOKUP = 0X13; -+static const int RBDSC_REGISTER_REPLY = 0X14; -+static const int RBDSC_READ_REPLY = 0X15; -+static const int RBDSC_LOOKUP_REPLY = 0X16; -+static const int RBDSC_READ_RADOS = 0X17; -+ -+ -+ -+typedef std::function<void(uint64_t, std::string)> ProcessMsg; -+typedef std::function<void(std::string)> ClientProcessMsg; -+typedef uint8_t rbdsc_req_type; -+ -+//TODO(): switch to bufferlist -+struct rbdsc_req_type_t { -+ rbdsc_req_type type; -+ uint64_t vol_size; -+ uint64_t offset; -+ uint64_t length; -+ char pool_name[256]; -+ char vol_name[256]; -+ -+ uint64_t size() { -+ return sizeof(rbdsc_req_type_t); -+ } -+ -+ std::string to_buffer() { -+ std::stringstream ss; -+ ss << type; -+ ss << vol_size; -+ ss << offset; -+ ss << length; -+ ss << pool_name; -+ ss << vol_name; -+ -+ return ss.str(); -+ } -+}; -+ -+static const int RBDSC_MSG_LEN = sizeof(rbdsc_req_type_t); -+ -+} // namespace immutable_obj_cache -+} // namespace ceph -+#endif -diff --git a/src/tools/ceph_immutable_object_cache/main.cc b/src/tools/ceph_immutable_object_cache/main.cc -new file mode 100644 -index 0000000..7a9131d ---- /dev/null -+++ b/src/tools/ceph_immutable_object_cache/main.cc -@@ -0,0 +1,85 @@ -+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -+// vim: ts=8 sw=2 smarttab -+ -+#include "common/ceph_argparse.h" -+#include "common/config.h" -+#include "common/debug.h" -+#include "common/errno.h" -+#include "global/global_init.h" -+#include "global/signal_handler.h" -+#include "CacheController.h" -+ -+#include <vector> -+ -+ceph::immutable_obj_cache::CacheController *cachectl = nullptr; -+ -+void usage() { -+ std::cout << "usage: cache controller [options...]" << std::endl; -+ std::cout << "options:\n"; -+ std::cout << " -m monaddress[:port] connect to specified monitor\n"; -+ std::cout << " --keyring=<path> path to keyring for local cluster\n"; -+ std::cout << " --log-file=<logfile> file to log debug output\n"; -+ std::cout << " --debug-rbd-cachecontroller=<log-level>/<memory-level> set rbd-mirror debug level\n"; -+ generic_server_usage(); -+} -+ -+static void handle_signal(int signum) -+{ -+ if (cachectl) -+ cachectl->handle_signal(signum); -+} -+ -+int main(int argc, const char **argv) -+{ -+ std::vector<const char*> args; -+ env_to_vec(args); -+ argv_to_vec(argc, argv, args); -+ -+ auto cct = global_init(nullptr, args, CEPH_ENTITY_TYPE_CLIENT, -+ CODE_ENVIRONMENT_DAEMON, -+ CINIT_FLAG_UNPRIVILEGED_DAEMON_DEFAULTS); -+ -+ for (auto i = args.begin(); i != args.end(); ++i) { -+ if (ceph_argparse_flag(args, i, "-h", "--help", (char*)NULL)) { -+ usage(); -+ return EXIT_SUCCESS; -+ } -+ } -+ -+ if (g_conf()->daemonize) { -+ global_init_daemonize(g_ceph_context); -+ } -+ g_ceph_context->enable_perf_counter(); -+ -+ common_init_finish(g_ceph_context); -+ -+ init_async_signal_handler(); -+ register_async_signal_handler(SIGHUP, sighup_handler); -+ register_async_signal_handler_oneshot(SIGINT, handle_signal); -+ register_async_signal_handler_oneshot(SIGTERM, handle_signal); -+ -+ std::vector<const char*> cmd_args; -+ argv_to_vec(argc, argv, cmd_args); -+ -+ // disable unnecessary librbd cache -+ g_ceph_context->_conf.set_val_or_die("rbd_cache", "false"); -+ -+ cachectl = new ceph::immutable_obj_cache::CacheController(g_ceph_context, cmd_args); -+ int r = cachectl->init(); -+ if (r < 0) { -+ std::cerr << "failed to initialize: " << cpp_strerror(r) << std::endl; -+ goto cleanup; -+ } -+ -+ cachectl->run(); -+ -+ cleanup: -+ unregister_async_signal_handler(SIGHUP, sighup_handler); -+ unregister_async_signal_handler(SIGINT, handle_signal); -+ unregister_async_signal_handler(SIGTERM, handle_signal); -+ shutdown_async_signal_handler(); -+ -+ delete cachectl; -+ -+ return r < 0 ? EXIT_SUCCESS : EXIT_FAILURE; -+} -diff --git a/src/tools/rbd_cache/CMakeLists.txt b/src/tools/rbd_cache/CMakeLists.txt -deleted file mode 100644 -index 597d802..0000000 ---- a/src/tools/rbd_cache/CMakeLists.txt -+++ /dev/null -@@ -1,9 +0,0 @@ --add_executable(rbd-cache -- ${CMAKE_SOURCE_DIR}/src/librbd/cache/SharedPersistentObjectCacherFile.cc -- ObjectCacheStore.cc -- CacheController.cc -- main.cc) --target_link_libraries(rbd-cache -- librados -- global) --install(TARGETS rbd-cache DESTINATION bin) -diff --git a/src/tools/rbd_cache/CacheController.cc b/src/tools/rbd_cache/CacheController.cc -deleted file mode 100644 -index 620192c..0000000 ---- a/src/tools/rbd_cache/CacheController.cc -+++ /dev/null -@@ -1,116 +0,0 @@ --// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- --// vim: ts=8 sw=2 smarttab -- --#include "CacheController.h" -- --#define dout_context g_ceph_context --#define dout_subsys ceph_subsys_rbd_cache --#undef dout_prefix --#define dout_prefix *_dout << "rbd::cache::CacheController: " << this << " " \ -- << __func__ << ": " -- --namespace rbd { --namespace cache { -- --class ThreadPoolSingleton : public ThreadPool { --public: -- ContextWQ *op_work_queue; -- -- explicit ThreadPoolSingleton(CephContext *cct) -- : ThreadPool(cct, "librbd::cache::thread_pool", "tp_librbd_cache", 32, -- "pcache_threads"), -- op_work_queue(new ContextWQ("librbd::pcache_op_work_queue", -- cct->_conf.get_val<int64_t>("rbd_op_thread_timeout"), -- this)) { -- start(); -- } -- ~ThreadPoolSingleton() override { -- op_work_queue->drain(); -- delete op_work_queue; -- -- stop(); -- } --}; -- -- --CacheController::CacheController(CephContext *cct, const std::vector<const char*> &args): -- m_args(args), m_cct(cct) { -- --} -- --CacheController::~CacheController() { -- --} -- --int CacheController::init() { -- ThreadPoolSingleton* thread_pool_singleton = &m_cct->lookup_or_create_singleton_object<ThreadPoolSingleton>( -- "rbd::cache::thread_pool", false, m_cct); -- pcache_op_work_queue = thread_pool_singleton->op_work_queue; -- -- m_object_cache_store = new ObjectCacheStore(m_cct, pcache_op_work_queue); -- int r = m_object_cache_store->init(false); -- if (r < 0) { -- //derr << "init error\n" << dendl; -- } -- return r; --} -- --int CacheController::shutdown() { -- int r = m_object_cache_store->shutdown(); -- return r; --} -- --void CacheController::handle_signal(int signum){} -- --void CacheController::run() { -- try { -- //TODO(): use new socket path -- std::string controller_path = m_cct->_conf.get_val<std::string>("rbd_shared_cache_sock"); -- std::remove(controller_path.c_str()); -- -- m_cache_server = new CacheServer(controller_path, -- ([&](uint64_t p, std::string s){handle_request(p, s);}), m_cct); -- m_cache_server->run(); -- } catch (std::exception& e) { -- std::cerr << "Exception: " << e.what() << "\n"; -- } --} -- --void CacheController::handle_request(uint64_t session_id, std::string msg){ -- rbdsc_req_type_t *io_ctx = (rbdsc_req_type_t*)(msg.c_str()); -- -- int ret = 0; -- -- switch (io_ctx->type) { -- case RBDSC_REGISTER: { -- // init cache layout for volume -- m_object_cache_store->init_cache(io_ctx->vol_name, io_ctx->vol_size); -- io_ctx->type = RBDSC_REGISTER_REPLY; -- m_cache_server->send(session_id, std::string((char*)io_ctx, msg.size())); -- -- break; -- } -- case RBDSC_READ: { -- // lookup object in local cache store -- ret = m_object_cache_store->lookup_object(io_ctx->pool_name, io_ctx->vol_name); -- if (ret < 0) { -- io_ctx->type = RBDSC_READ_RADOS; -- } else { -- io_ctx->type = RBDSC_READ_REPLY; -- } -- if (io_ctx->type != RBDSC_READ_REPLY) { -- assert(0); -- } -- m_cache_server->send(session_id, std::string((char*)io_ctx, msg.size())); -- -- break; -- } -- std::cout<<"can't recongize request"<<std::endl; -- assert(0); // TODO replace it. -- } --} -- --} // namespace rbd --} // namespace cache -- -- -diff --git a/src/tools/rbd_cache/CacheController.h b/src/tools/rbd_cache/CacheController.h -deleted file mode 100644 -index 0e23484..0000000 ---- a/src/tools/rbd_cache/CacheController.h -+++ /dev/null -@@ -1,54 +0,0 @@ --#ifndef CACHE_CONTROLLER_H --#define CACHE_CONTROLLER_H -- --#include <thread> -- --#include "common/Formatter.h" --#include "common/admin_socket.h" --#include "common/debug.h" --#include "common/errno.h" --#include "common/ceph_context.h" --#include "common/Mutex.h" --#include "common/WorkQueue.h" --#include "include/rados/librados.hpp" --#include "include/rbd/librbd.h" --#include "include/assert.h" --#include "librbd/ImageCtx.h" --#include "librbd/ImageState.h" -- --#include "CacheControllerSocket.hpp" --#include "ObjectCacheStore.h" -- -- --using boost::asio::local::stream_protocol; -- --namespace rbd { --namespace cache { -- --class CacheController { -- public: -- CacheController(CephContext *cct, const std::vector<const char*> &args); -- ~CacheController(); -- -- int init(); -- -- int shutdown(); -- -- void handle_signal(int sinnum); -- -- void run(); -- -- void handle_request(uint64_t sesstion_id, std::string msg); -- -- private: -- CacheServer *m_cache_server; -- std::vector<const char*> m_args; -- CephContext *m_cct; -- ObjectCacheStore *m_object_cache_store; -- ContextWQ* pcache_op_work_queue; --}; -- --} // namespace rbd --} // namespace cache -- --#endif -diff --git a/src/tools/rbd_cache/CacheControllerSocket.hpp b/src/tools/rbd_cache/CacheControllerSocket.hpp -deleted file mode 100644 -index 2ff7477..0000000 ---- a/src/tools/rbd_cache/CacheControllerSocket.hpp -+++ /dev/null -@@ -1,228 +0,0 @@ --// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- --// vim: ts=8 sw=2 smarttab -- --#ifndef CACHE_CONTROLLER_SOCKET_H --#define CACHE_CONTROLLER_SOCKET_H -- --#include <cstdio> --#include <iostream> --#include <array> --#include <memory> --#include <string> --#include <boost/bind.hpp> --#include <boost/asio.hpp> --#include <boost/asio/error.hpp> --#include <boost/algorithm/string.hpp> --#include "CacheControllerSocketCommon.h" -- -- --using boost::asio::local::stream_protocol; -- --namespace rbd { --namespace cache { -- --class session : public std::enable_shared_from_this<session> { --public: -- session(uint64_t session_id, boost::asio::io_service& io_service, ProcessMsg processmsg) -- : m_session_id(session_id), m_dm_socket(io_service), process_msg(processmsg) {} -- -- stream_protocol::socket& socket() { -- return m_dm_socket; -- } -- -- void start() { -- if(true) { -- serial_handing_request(); -- } else { -- parallel_handing_request(); -- } -- } -- // flow: -- // -- // recv request --> process request --> reply ack -- // | | -- // --------------<------------------------- -- void serial_handing_request() { -- boost::asio::async_read(m_dm_socket, boost::asio::buffer(m_buffer, RBDSC_MSG_LEN), -- boost::asio::transfer_exactly(RBDSC_MSG_LEN), -- boost::bind(&session::handle_read, -- shared_from_this(), -- boost::asio::placeholders::error, -- boost::asio::placeholders::bytes_transferred)); -- } -- -- // flow : -- // -- // --> thread 1: process request -- // recv request --> thread 2: process request --> reply ack -- // --> thread n: process request -- // -- void parallel_handing_request() { -- // TODO -- } -- --private: -- -- void handle_read(const boost::system::error_code& error, size_t bytes_transferred) { -- // when recv eof, the most proble is that client side close socket. -- // so, server side need to end handing_request -- if(error == boost::asio::error::eof) { -- std::cout<<"session: async_read : " << error.message() << std::endl; -- return; -- } -- -- if(error) { -- std::cout<<"session: async_read fails: " << error.message() << std::endl; -- assert(0); -- } -- -- if(bytes_transferred != RBDSC_MSG_LEN) { -- std::cout<<"session : request in-complete. "<<std::endl; -- assert(0); -- } -- -- // TODO async_process can increse coding readable. -- // process_msg_callback call handle async_send -- process_msg(m_session_id, std::string(m_buffer, bytes_transferred)); -- } -- -- void handle_write(const boost::system::error_code& error, size_t bytes_transferred) { -- if (error) { -- std::cout<<"session: async_write fails: " << error.message() << std::endl; -- assert(0); -- } -- -- if(bytes_transferred != RBDSC_MSG_LEN) { -- std::cout<<"session : reply in-complete. "<<std::endl; -- assert(0); -- } -- -- boost::asio::async_read(m_dm_socket, boost::asio::buffer(m_buffer), -- boost::asio::transfer_exactly(RBDSC_MSG_LEN), -- boost::bind(&session::handle_read, -- shared_from_this(), -- boost::asio::placeholders::error, -- boost::asio::placeholders::bytes_transferred)); -- -- } -- --public: -- void send(std::string msg) { -- boost::asio::async_write(m_dm_socket, -- boost::asio::buffer(msg.c_str(), msg.size()), -- boost::asio::transfer_exactly(RBDSC_MSG_LEN), -- boost::bind(&session::handle_write, -- shared_from_this(), -- boost::asio::placeholders::error, -- boost::asio::placeholders::bytes_transferred)); -- -- } -- --private: -- uint64_t m_session_id; -- stream_protocol::socket m_dm_socket; -- ProcessMsg process_msg; -- -- // Buffer used to store data received from the client. -- //std::array<char, 1024> data_; -- char m_buffer[1024]; --}; -- --typedef std::shared_ptr<session> session_ptr; -- --class CacheServer { --public: -- CacheServer(const std::string& file, ProcessMsg processmsg, CephContext* cct) -- : m_cct(cct), m_server_process_msg(processmsg), -- m_local_path(file), -- m_acceptor(m_io_service) -- {} -- -- void run() { -- bool ret; -- ret = start_accept(); -- if(!ret) { -- return; -- } -- m_io_service.run(); -- } -- -- // TODO : use callback to replace this function. -- void send(uint64_t session_id, std::string msg) { -- auto it = m_session_map.find(session_id); -- if (it != m_session_map.end()) { -- it->second->send(msg); -- } else { -- // TODO : why don't find existing session id ? -- std::cout<<"don't find session id..."<<std::endl; -- assert(0); -- } -- } -- --private: -- // when creating one acceptor, can control every step in this way. -- bool start_accept() { -- boost::system::error_code ec; -- m_acceptor.open(m_local_path.protocol(), ec); -- if(ec) { -- std::cout << "m_acceptor open fails: " << ec.message() << std::endl; -- return false; -- } -- -- // TODO control acceptor attribute. -- -- m_acceptor.bind(m_local_path, ec); -- if(ec) { -- std::cout << "m_acceptor bind fails: " << ec.message() << std::endl; -- return false; -- } -- -- m_acceptor.listen(boost::asio::socket_base::max_connections, ec); -- if(ec) { -- std::cout << "m_acceptor listen fails: " << ec.message() << std::endl; -- return false; -- } -- -- accept(); -- return true; -- } -- -- void accept() { -- session_ptr new_session(new session(m_session_id, m_io_service, m_server_process_msg)); -- m_acceptor.async_accept(new_session->socket(), -- boost::bind(&CacheServer::handle_accept, this, new_session, -- boost::asio::placeholders::error)); -- } -- -- void handle_accept(session_ptr new_session, const boost::system::error_code& error) { -- //TODO(): open librbd snap ... yuan -- -- if(error) { -- std::cout << "async accept fails : " << error.message() << std::endl; -- assert(0); // TODO -- } -- -- // must put session into m_session_map at the front of session.start() -- m_session_map.emplace(m_session_id, new_session); -- // TODO : session setting -- new_session->start(); -- m_session_id++; -- -- // lanuch next accept -- accept(); -- } -- --private: -- CephContext* m_cct; -- boost::asio::io_service m_io_service; // TODO wrapper it. -- ProcessMsg m_server_process_msg; -- stream_protocol::endpoint m_local_path; -- stream_protocol::acceptor m_acceptor; -- uint64_t m_session_id = 1; -- std::map<uint64_t, session_ptr> m_session_map; --}; -- --} // namespace cache --} // namespace rbd -- --#endif -diff --git a/src/tools/rbd_cache/CacheControllerSocketClient.hpp b/src/tools/rbd_cache/CacheControllerSocketClient.hpp -deleted file mode 100644 -index 964f888..0000000 ---- a/src/tools/rbd_cache/CacheControllerSocketClient.hpp -+++ /dev/null -@@ -1,229 +0,0 @@ --// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- --// vim: ts=8 sw=2 smarttab -- --#ifndef CACHE_CONTROLLER_SOCKET_CLIENT_H --#define CACHE_CONTROLLER_SOCKET_CLIENT_H -- --#include <atomic> --#include <boost/asio.hpp> --#include <boost/bind.hpp> --#include <boost/asio/error.hpp> --#include <boost/algorithm/string.hpp> --#include "librbd/ImageCtx.h" --#include "include/assert.h" --#include "include/Context.h" --#include "CacheControllerSocketCommon.h" -- -- --using boost::asio::local::stream_protocol; -- --namespace rbd { --namespace cache { -- --class CacheClient { --public: -- CacheClient(const std::string& file, ClientProcessMsg processmsg, CephContext* ceph_ctx) -- : m_io_service_work(m_io_service), -- m_dm_socket(m_io_service), -- m_client_process_msg(processmsg), -- m_ep(stream_protocol::endpoint(file)), -- m_session_work(false), -- cct(ceph_ctx) -- { -- // TODO wrapper io_service -- std::thread thd([this](){ -- m_io_service.run();}); -- thd.detach(); -- } -- -- void run(){ -- } -- -- bool is_session_work() { -- return m_session_work.load() == true; -- } -- -- // just when error occur, call this method. -- void close() { -- m_session_work.store(false); -- boost::system::error_code close_ec; -- m_dm_socket.close(close_ec); -- if(close_ec) { -- std::cout << "close: " << close_ec.message() << std::endl; -- } -- std::cout << "session don't work, later all request will be dispatched to rados layer" << std::endl; -- } -- -- int connect() { -- boost::system::error_code ec; -- m_dm_socket.connect(m_ep, ec); -- if(ec) { -- if(ec == boost::asio::error::connection_refused) { -- std::cout << ec.message() << " : maybe rbd-cache Controller don't startup. " -- << "Now data will be read from ceph cluster " << std::endl; -- } else { -- std::cout << "connect: " << ec.message() << std::endl; -- } -- -- if(m_dm_socket.is_open()) { -- // Set to indicate what error occurred, if any. -- // Note that, even if the function indicates an error, -- // the underlying descriptor is closed. -- boost::system::error_code close_ec; -- m_dm_socket.close(close_ec); -- if(close_ec) { -- std::cout << "close: " << close_ec.message() << std::endl; -- } -- } -- return -1; -- } -- -- std::cout<<"connect success"<<std::endl; -- -- return 0; -- } -- -- int register_volume(std::string pool_name, std::string vol_name, uint64_t vol_size) { -- // cache controller will init layout -- rbdsc_req_type_t *message = new rbdsc_req_type_t(); -- message->type = RBDSC_REGISTER; -- memcpy(message->pool_name, pool_name.c_str(), pool_name.size()); -- memcpy(message->vol_name, vol_name.c_str(), vol_name.size()); -- message->vol_size = vol_size; -- message->offset = 0; -- message->length = 0; -- -- uint64_t ret; -- boost::system::error_code ec; -- -- ret = boost::asio::write(m_dm_socket, boost::asio::buffer((char*)message, message->size()), ec); -- if(ec) { -- std::cout << "write fails : " << ec.message() << std::endl; -- return -1; -- } -- -- if(ret != message->size()) { -- std::cout << "write fails : ret != send_bytes "<< std::endl; -- return -1; -- } -- -- // hard code TODO -- ret = boost::asio::read(m_dm_socket, boost::asio::buffer(m_recv_buffer, RBDSC_MSG_LEN), ec); -- if(ec == boost::asio::error::eof) { -- std::cout<< "recv eof"<<std::endl; -- return -1; -- } -- -- if(ec) { -- std::cout << "write fails : " << ec.message() << std::endl; -- return -1; -- } -- -- if(ret != RBDSC_MSG_LEN) { -- std::cout << "write fails : ret != receive bytes " << std::endl; -- return -1; -- } -- -- m_client_process_msg(std::string(m_recv_buffer, ret)); -- -- delete message; -- -- std::cout << "register volume success" << std::endl; -- -- // TODO -- m_session_work.store(true); -- -- return 0; -- } -- -- // if occur any error, we just return false. Then read from rados. -- int lookup_object(std::string pool_name, std::string vol_name, std::string object_id, Context* on_finish) { -- rbdsc_req_type_t *message = new rbdsc_req_type_t(); -- message->type = RBDSC_READ; -- memcpy(message->pool_name, pool_name.c_str(), pool_name.size()); -- memcpy(message->vol_name, object_id.c_str(), object_id.size()); -- message->vol_size = 0; -- message->offset = 0; -- message->length = 0; -- -- boost::asio::async_write(m_dm_socket, -- boost::asio::buffer((char*)message, message->size()), -- boost::asio::transfer_exactly(RBDSC_MSG_LEN), -- [this, on_finish, message](const boost::system::error_code& err, size_t cb) { -- delete message; -- if(err) { -- std::cout<< "lookup_object: async_write fails." << err.message() << std::endl; -- close(); -- on_finish->complete(false); -- return; -- } -- if(cb != RBDSC_MSG_LEN) { -- std::cout<< "lookup_object: async_write fails. in-complete request" <<std::endl; -- close(); -- on_finish->complete(false); -- return; -- } -- get_result(on_finish); -- }); -- -- return 0; -- } -- -- void get_result(Context* on_finish) { -- boost::asio::async_read(m_dm_socket, boost::asio::buffer(m_recv_buffer, RBDSC_MSG_LEN), -- boost::asio::transfer_exactly(RBDSC_MSG_LEN), -- [this, on_finish](const boost::system::error_code& err, size_t cb) { -- if(err == boost::asio::error::eof) { -- std::cout<<"get_result: ack is EOF." << std::endl; -- close(); -- on_finish->complete(false); -- return; -- } -- if(err) { -- std::cout<< "get_result: async_read fails:" << err.message() << std::endl; -- close(); -- on_finish->complete(false); // TODO replace this assert with some metohds. -- return; -- } -- if (cb != RBDSC_MSG_LEN) { -- close(); -- std::cout << "get_result: in-complete ack." << std::endl; -- on_finish->complete(false); // TODO: replace this assert with some methods. -- } -- -- rbdsc_req_type_t *io_ctx = (rbdsc_req_type_t*)(m_recv_buffer); -- -- // TODO: re-occur yuan's bug -- if(io_ctx->type == RBDSC_READ) { -- std::cout << "get rbdsc_read... " << std::endl; -- assert(0); -- } -- -- if (io_ctx->type == RBDSC_READ_REPLY) { -- on_finish->complete(true); -- return; -- } else { -- on_finish->complete(false); -- return; -- } -- }); -- } -- --private: -- boost::asio::io_service m_io_service; -- boost::asio::io_service::work m_io_service_work; -- stream_protocol::socket m_dm_socket; -- ClientProcessMsg m_client_process_msg; -- stream_protocol::endpoint m_ep; -- char m_recv_buffer[1024]; -- -- // atomic modfiy for this variable. -- // thread 1 : asio callback thread modify it. -- // thread 2 : librbd read it. -- std::atomic<bool> m_session_work; -- CephContext* cct; --}; -- --} // namespace cache --} // namespace rbd --#endif -diff --git a/src/tools/rbd_cache/CacheControllerSocketCommon.h b/src/tools/rbd_cache/CacheControllerSocketCommon.h -deleted file mode 100644 -index e17529a..0000000 ---- a/src/tools/rbd_cache/CacheControllerSocketCommon.h -+++ /dev/null -@@ -1,62 +0,0 @@ --// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- --// vim: ts=8 sw=2 smarttab -- --#ifndef CACHE_CONTROLLER_SOCKET_COMMON_H --#define CACHE_CONTROLLER_SOCKET_COMMON_H -- --/* --#define RBDSC_REGISTER 0X11 --#define RBDSC_READ 0X12 --#define RBDSC_LOOKUP 0X13 --#define RBDSC_REGISTER_REPLY 0X14 --#define RBDSC_READ_REPLY 0X15 --#define RBDSC_LOOKUP_REPLY 0X16 --#define RBDSC_READ_RADOS 0X17 --*/ -- --namespace rbd { --namespace cache { -- --static const int RBDSC_REGISTER = 0X11; --static const int RBDSC_READ = 0X12; --static const int RBDSC_LOOKUP = 0X13; --static const int RBDSC_REGISTER_REPLY = 0X14; --static const int RBDSC_READ_REPLY = 0X15; --static const int RBDSC_LOOKUP_REPLY = 0X16; --static const int RBDSC_READ_RADOS = 0X17; -- -- -- --typedef std::function<void(uint64_t, std::string)> ProcessMsg; --typedef std::function<void(std::string)> ClientProcessMsg; --typedef uint8_t rbdsc_req_type; --struct rbdsc_req_type_t { -- rbdsc_req_type type; -- uint64_t vol_size; -- uint64_t offset; -- uint64_t length; -- char pool_name[256]; -- char vol_name[256]; -- -- uint64_t size() { -- return sizeof(rbdsc_req_type_t); -- } -- -- std::string to_buffer() { -- std::stringstream ss; -- ss << type; -- ss << vol_size; -- ss << offset; -- ss << length; -- ss << pool_name; -- ss << vol_name; -- -- return ss.str(); -- } --}; -- --static const int RBDSC_MSG_LEN = sizeof(rbdsc_req_type_t); -- --} // namespace cache --} // namespace rbd --#endif -diff --git a/src/tools/rbd_cache/ObjectCacheStore.cc b/src/tools/rbd_cache/ObjectCacheStore.cc -deleted file mode 100644 -index 99f90d6..0000000 ---- a/src/tools/rbd_cache/ObjectCacheStore.cc -+++ /dev/null -@@ -1,172 +0,0 @@ --// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- --// vim: ts=8 sw=2 smarttab -- --#include "ObjectCacheStore.h" -- --#define dout_context g_ceph_context --#define dout_subsys ceph_subsys_rbd_cache --#undef dout_prefix --#define dout_prefix *_dout << "rbd::cache::ObjectCacheStore: " << this << " " \ -- << __func__ << ": " -- --namespace rbd { --namespace cache { -- --ObjectCacheStore::ObjectCacheStore(CephContext *cct, ContextWQ* work_queue) -- : m_cct(cct), m_work_queue(work_queue), -- m_rados(new librados::Rados()) { -- -- uint64_t object_cache_entries = -- cct->_conf.get_val<int64_t>("rbd_shared_cache_entries"); -- -- //TODO(): allow to set level -- m_policy = new SimplePolicy(object_cache_entries, 0.5); --} -- --ObjectCacheStore::~ObjectCacheStore() { -- delete m_policy; --} -- --int ObjectCacheStore::init(bool reset) { -- -- int ret = m_rados->init_with_context(m_cct); -- if(ret < 0) { -- lderr(m_cct) << "fail to init Ceph context" << dendl; -- return ret; -- } -- -- ret = m_rados->connect(); -- if(ret < 0 ) { -- lderr(m_cct) << "fail to conect to cluster" << dendl; -- return ret; -- } -- -- std::string cache_path = m_cct->_conf.get_val<std::string>("rbd_shared_cache_path"); -- //TODO(): check and reuse existing cache objects -- if(reset) { -- std::string cmd = "exec rm -rf " + cache_path + "/rbd_cache*; exec mkdir -p " + cache_path; -- //TODO(): to use std::filesystem -- int r = system(cmd.c_str()); -- } -- -- evict_thd = new std::thread([this]{this->evict_thread_body();}); -- return ret; --} -- --int ObjectCacheStore::do_promote(std::string pool_name, std::string object_name) { -- int ret = 0; -- std::string cache_file_name = pool_name + object_name; -- -- //TODO(): lock on ioctx map -- if (m_ioctxs.find(pool_name) == m_ioctxs.end()) { -- librados::IoCtx* io_ctx = new librados::IoCtx(); -- ret = m_rados->ioctx_create(pool_name.c_str(), *io_ctx); -- if (ret < 0) { -- lderr(m_cct) << "fail to create ioctx" << dendl; -- assert(0); -- } -- m_ioctxs.emplace(pool_name, io_ctx); -- } -- -- assert(m_ioctxs.find(pool_name) != m_ioctxs.end()); -- -- librados::IoCtx* ioctx = m_ioctxs[pool_name]; -- -- librados::bufferlist* read_buf = new librados::bufferlist(); -- int object_size = 4096*1024; //TODO(): read config from image metadata -- -- //TODO(): async promote -- ret = promote_object(ioctx, object_name, read_buf, object_size); -- if (ret == -ENOENT) { -- read_buf->append(std::string(object_size, '0')); -- ret = 0; -- } -- -- if( ret < 0) { -- lderr(m_cct) << "fail to read from rados" << dendl; -- return ret; -- } -- -- // persistent to cache -- librbd::cache::SyncFile cache_file(m_cct, cache_file_name); -- cache_file.open(); -- ret = cache_file.write_object_to_file(*read_buf, object_size); -- -- // update metadata -- assert(OBJ_CACHE_PROMOTING == m_policy->get_status(cache_file_name)); -- m_policy->update_status(cache_file_name, OBJ_CACHE_PROMOTED); -- assert(OBJ_CACHE_PROMOTED == m_policy->get_status(cache_file_name)); -- -- return ret; -- --} -- --// return -1, client need to read data from cluster. --// return 0, client directly read data from cache. --int ObjectCacheStore::lookup_object(std::string pool_name, std::string object_name) { -- -- std::string cache_file_name = pool_name + object_name; -- -- CACHESTATUS ret; -- ret = m_policy->lookup_object(cache_file_name); -- -- switch(ret) { -- case OBJ_CACHE_NONE: -- return do_promote(pool_name, object_name); -- case OBJ_CACHE_PROMOTED: -- return 0; -- case OBJ_CACHE_PROMOTING: -- default: -- return -1; -- } --} -- --void ObjectCacheStore::evict_thread_body() { -- int ret; -- while(m_evict_go) { -- ret = evict_objects(); -- } --} -- -- --int ObjectCacheStore::shutdown() { -- m_evict_go = false; -- evict_thd->join(); -- m_rados->shutdown(); -- return 0; --} -- --int ObjectCacheStore::init_cache(std::string vol_name, uint64_t vol_size) { -- return 0; --} -- --int ObjectCacheStore::lock_cache(std::string vol_name) { -- return 0; --} -- --int ObjectCacheStore::promote_object(librados::IoCtx* ioctx, std::string object_name, librados::bufferlist* read_buf, uint64_t read_len) { -- int ret; -- -- librados::AioCompletion* read_completion = librados::Rados::aio_create_completion(); -- -- ret = ioctx->aio_read(object_name, read_completion, read_buf, read_len, 0); -- if(ret < 0) { -- lderr(m_cct) << "fail to read from rados" << dendl; -- return ret; -- } -- read_completion->wait_for_complete(); -- ret = read_completion->get_return_value(); -- return ret; -- --} -- --int ObjectCacheStore::evict_objects() { -- std::list<std::string> obj_list; -- m_policy->get_evict_list(&obj_list); -- for (auto& obj: obj_list) { -- //do_evict(obj); -- } --} -- --} // namespace cache --} // namespace rbd -diff --git a/src/tools/rbd_cache/ObjectCacheStore.h b/src/tools/rbd_cache/ObjectCacheStore.h -deleted file mode 100644 -index ba0e1f1..0000000 ---- a/src/tools/rbd_cache/ObjectCacheStore.h -+++ /dev/null -@@ -1,70 +0,0 @@ --// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- --// vim: ts=8 sw=2 smarttab -- --#ifndef OBJECT_CACHE_STORE_H --#define OBJECT_CACHE_STORE_H -- --#include "common/debug.h" --#include "common/errno.h" --#include "common/ceph_context.h" --#include "common/Mutex.h" --#include "include/rados/librados.hpp" --#include "include/rbd/librbd.h" --#include "librbd/ImageCtx.h" --#include "librbd/ImageState.h" --#include "librbd/cache/SharedPersistentObjectCacherFile.h" --#include "SimplePolicy.hpp" -- -- --using librados::Rados; --using librados::IoCtx; -- --namespace rbd { --namespace cache { -- --typedef shared_ptr<librados::Rados> RadosRef; --typedef shared_ptr<librados::IoCtx> IoCtxRef; -- --class ObjectCacheStore --{ -- public: -- ObjectCacheStore(CephContext *cct, ContextWQ* work_queue); -- ~ObjectCacheStore(); -- -- int init(bool reset); -- -- int shutdown(); -- -- int lookup_object(std::string pool_name, std::string object_name); -- -- int init_cache(std::string vol_name, uint64_t vol_size); -- -- int lock_cache(std::string vol_name); -- -- private: -- void evict_thread_body(); -- int evict_objects(); -- -- int do_promote(std::string pool_name, std::string object_name); -- -- int promote_object(librados::IoCtx*, std::string object_name, -- librados::bufferlist* read_buf, -- uint64_t length); -- -- CephContext *m_cct; -- ContextWQ* m_work_queue; -- RadosRef m_rados; -- -- -- std::map<std::string, librados::IoCtx*> m_ioctxs; -- -- librbd::cache::SyncFile *m_cache_file; -- -- Policy* m_policy; -- std::thread* evict_thd; -- bool m_evict_go = false; --}; -- --} // namespace rbd --} // namespace cache --#endif -diff --git a/src/tools/rbd_cache/Policy.hpp b/src/tools/rbd_cache/Policy.hpp -deleted file mode 100644 -index 711e3bd..0000000 ---- a/src/tools/rbd_cache/Policy.hpp -+++ /dev/null -@@ -1,30 +0,0 @@ --#ifndef RBD_CACHE_POLICY_HPP --#define RBD_CACHE_POLICY_HPP -- --#include <list> --#include <string> -- --namespace rbd { --namespace cache { -- --enum CACHESTATUS { -- OBJ_CACHE_NONE = 0, -- OBJ_CACHE_PROMOTING, -- OBJ_CACHE_PROMOTED, --}; -- -- --class Policy { --public: -- Policy(){} -- virtual ~Policy(){}; -- virtual CACHESTATUS lookup_object(std::string) = 0; -- virtual int evict_object(std::string&) = 0; -- virtual void update_status(std::string, CACHESTATUS) = 0; -- virtual CACHESTATUS get_status(std::string) = 0; -- virtual void get_evict_list(std::list<std::string>* obj_list) = 0; --}; -- --} // namespace cache --} // namespace rbd --#endif -diff --git a/src/tools/rbd_cache/SimplePolicy.hpp b/src/tools/rbd_cache/SimplePolicy.hpp -deleted file mode 100644 -index e785de1..0000000 ---- a/src/tools/rbd_cache/SimplePolicy.hpp -+++ /dev/null -@@ -1,160 +0,0 @@ --#ifndef RBD_CACHE_SIMPLE_POLICY_HPP --#define RBD_CACHE_SIMPLE_POLICY_HPP -- --#include "Policy.hpp" --#include "include/lru.h" --#include "common/RWLock.h" --#include "common/Mutex.h" -- --#include <vector> --#include <unordered_map> --#include <string> -- --namespace rbd { --namespace cache { -- -- --class SimplePolicy : public Policy { --public: -- SimplePolicy(uint64_t block_num, float watermark) -- : m_watermark(watermark), m_entry_count(block_num), -- m_cache_map_lock("rbd::cache::SimplePolicy::m_cache_map_lock"), -- m_free_list_lock("rbd::cache::SimplePolicy::m_free_list_lock") -- { -- -- for(uint64_t i = 0; i < m_entry_count; i++) { -- m_free_list.push_back(new Entry()); -- } -- -- } -- -- ~SimplePolicy() { -- for(uint64_t i = 0; i < m_entry_count; i++) { -- Entry* entry = reinterpret_cast<Entry*>(m_free_list.front()); -- delete entry; -- m_free_list.pop_front(); -- } -- } -- -- CACHESTATUS lookup_object(std::string cache_file_name) { -- -- //TODO(): check race condition -- RWLock::WLocker wlocker(m_cache_map_lock); -- -- auto entry_it = m_cache_map.find(cache_file_name); -- if(entry_it == m_cache_map.end()) { -- Mutex::Locker locker(m_free_list_lock); -- Entry* entry = reinterpret_cast<Entry*>(m_free_list.front()); -- assert(entry != nullptr); -- m_free_list.pop_front(); -- entry->status = OBJ_CACHE_PROMOTING; -- -- m_cache_map[cache_file_name] = entry; -- -- return OBJ_CACHE_NONE; -- } -- -- Entry* entry = entry_it->second; -- -- if(entry->status == OBJ_CACHE_PROMOTED) { -- // touch it -- m_promoted_lru.lru_touch(entry); -- } -- -- return entry->status; -- } -- -- int evict_object(std::string& out_cache_file_name) { -- RWLock::WLocker locker(m_cache_map_lock); -- -- return 1; -- } -- -- // TODO(): simplify the logic -- void update_status(std::string file_name, CACHESTATUS new_status) { -- RWLock::WLocker locker(m_cache_map_lock); -- -- Entry* entry; -- auto entry_it = m_cache_map.find(file_name); -- -- // just check. -- if(new_status == OBJ_CACHE_PROMOTING) { -- assert(entry_it == m_cache_map.end()); -- } -- -- assert(entry_it != m_cache_map.end()); -- -- entry = entry_it->second; -- -- // promoting is done, so update it. -- if(entry->status == OBJ_CACHE_PROMOTING && new_status== OBJ_CACHE_PROMOTED) { -- m_promoted_lru.lru_insert_top(entry); -- entry->status = new_status; -- return; -- } -- -- assert(0); -- } -- -- // get entry status -- CACHESTATUS get_status(std::string file_name) { -- RWLock::RLocker locker(m_cache_map_lock); -- auto entry_it = m_cache_map.find(file_name); -- if(entry_it == m_cache_map.end()) { -- return OBJ_CACHE_NONE; -- } -- -- return entry_it->second->status; -- } -- -- void get_evict_list(std::list<std::string>* obj_list) { -- RWLock::WLocker locker(m_cache_map_lock); -- // check free ratio, pop entries from LRU -- if (m_free_list.size() / m_entry_count < m_watermark) { -- int evict_num = 10; //TODO(): make this configurable -- for(int i = 0; i < evict_num; i++) { -- Entry* entry = reinterpret_cast<Entry*>(m_promoted_lru.lru_expire()); -- if (entry == nullptr) { -- continue; -- } -- std::string file_name = entry->cache_file_name; -- obj_list->push_back(file_name); -- -- auto entry_it = m_cache_map.find(file_name); -- m_cache_map.erase(entry_it); -- -- //mark this entry as free -- entry->status = OBJ_CACHE_NONE; -- Mutex::Locker locker(m_free_list_lock); -- m_free_list.push_back(entry); -- } -- } -- } -- --private: -- -- class Entry : public LRUObject { -- public: -- CACHESTATUS status; -- Entry() : status(OBJ_CACHE_NONE){} -- std::string cache_file_name; -- void encode(bufferlist &bl){} -- void decode(bufferlist::iterator &it){} -- }; -- -- float m_watermark; -- uint64_t m_entry_count; -- -- std::unordered_map<std::string, Entry*> m_cache_map; -- RWLock m_cache_map_lock; -- -- std::deque<Entry*> m_free_list; -- Mutex m_free_list_lock; -- -- LRU m_promoted_lru; // include promoted, using status. -- --}; -- --} // namespace cache --} // namespace rbd --#endif -diff --git a/src/tools/rbd_cache/main.cc b/src/tools/rbd_cache/main.cc -deleted file mode 100644 -index d604760..0000000 ---- a/src/tools/rbd_cache/main.cc -+++ /dev/null -@@ -1,85 +0,0 @@ --// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- --// vim: ts=8 sw=2 smarttab -- --#include "common/ceph_argparse.h" --#include "common/config.h" --#include "common/debug.h" --#include "common/errno.h" --#include "global/global_init.h" --#include "global/signal_handler.h" --#include "CacheController.h" -- --#include <vector> -- --rbd::cache::CacheController *cachectl = nullptr; -- --void usage() { -- std::cout << "usage: cache controller [options...]" << std::endl; -- std::cout << "options:\n"; -- std::cout << " -m monaddress[:port] connect to specified monitor\n"; -- std::cout << " --keyring=<path> path to keyring for local cluster\n"; -- std::cout << " --log-file=<logfile> file to log debug output\n"; -- std::cout << " --debug-rbd-cachecontroller=<log-level>/<memory-level> set rbd-mirror debug level\n"; -- generic_server_usage(); --} -- --static void handle_signal(int signum) --{ -- if (cachectl) -- cachectl->handle_signal(signum); --} -- --int main(int argc, const char **argv) --{ -- std::vector<const char*> args; -- env_to_vec(args); -- argv_to_vec(argc, argv, args); -- -- auto cct = global_init(nullptr, args, CEPH_ENTITY_TYPE_CLIENT, -- CODE_ENVIRONMENT_DAEMON, -- CINIT_FLAG_UNPRIVILEGED_DAEMON_DEFAULTS); -- -- for (auto i = args.begin(); i != args.end(); ++i) { -- if (ceph_argparse_flag(args, i, "-h", "--help", (char*)NULL)) { -- usage(); -- return EXIT_SUCCESS; -- } -- } -- -- if (g_conf()->daemonize) { -- global_init_daemonize(g_ceph_context); -- } -- g_ceph_context->enable_perf_counter(); -- -- common_init_finish(g_ceph_context); -- -- init_async_signal_handler(); -- register_async_signal_handler(SIGHUP, sighup_handler); -- register_async_signal_handler_oneshot(SIGINT, handle_signal); -- register_async_signal_handler_oneshot(SIGTERM, handle_signal); -- -- std::vector<const char*> cmd_args; -- argv_to_vec(argc, argv, cmd_args); -- -- // disable unnecessary librbd cache -- g_ceph_context->_conf.set_val_or_die("rbd_cache", "false"); -- -- cachectl = new rbd::cache::CacheController(g_ceph_context, cmd_args); -- int r = cachectl->init(); -- if (r < 0) { -- std::cerr << "failed to initialize: " << cpp_strerror(r) << std::endl; -- goto cleanup; -- } -- -- cachectl->run(); -- -- cleanup: -- unregister_async_signal_handler(SIGHUP, sighup_handler); -- unregister_async_signal_handler(SIGINT, handle_signal); -- unregister_async_signal_handler(SIGTERM, handle_signal); -- shutdown_async_signal_handler(); -- -- delete cachectl; -- -- return r < 0 ? EXIT_SUCCESS : EXIT_FAILURE; --} --- -2.7.4 - diff --git a/src/ceph/ceph.rc b/src/ceph/ceph.rc index 9484318..39982a9 100644 --- a/src/ceph/ceph.rc +++ b/src/ceph/ceph.rc @@ -1,6 +1,6 @@ PROJECT="ceph" SUMMARY="a scalable distributed storage system" -BRANCH="mimic" +BRANCH="nautilus" REPO="https://github.com/ceph/ceph.git" OPTION="" @@ -11,14 +11,4 @@ OPTION="" # "0001-add-QAT-support.patch" \ # ) SOURCES=( - "0001-librbd-shared-persistent-read-only-rbd-cache.patch" \ - "0002-librbd-cleanup-rbd-shared-RO-cache.patch" \ - "0003-librbd-fix-bufferlist-point.patch" \ - "0004-librbd-fix-lookup-object-return.patch" \ - "0005-librbd-fix-conf-get_val.patch" \ - "0006-librbd-LRU-policy-based-eviction.patch" \ - "0007-librbd-cleanup-policy-based-promotion-eviction.patch" \ - "0008-librbd-implement-async-cache-lookup-and-read.patch" \ - "0009-librbd-clean-up-on-rbd-shared-cache.patch" \ - "0010-librbd-new-namespace-ceph-immutable-obj-cache.patch" \ ) |