diff options
Diffstat (limited to 'src/ceph/0001-librbd-shared-persistent-read-only-rbd-cache.patch')
-rw-r--r-- | src/ceph/0001-librbd-shared-persistent-read-only-rbd-cache.patch | 1685 |
1 files changed, 0 insertions, 1685 deletions
diff --git a/src/ceph/0001-librbd-shared-persistent-read-only-rbd-cache.patch b/src/ceph/0001-librbd-shared-persistent-read-only-rbd-cache.patch deleted file mode 100644 index 0476086..0000000 --- a/src/ceph/0001-librbd-shared-persistent-read-only-rbd-cache.patch +++ /dev/null @@ -1,1685 +0,0 @@ -From b7b81562c76011abe05930330915a5ba423964e4 Mon Sep 17 00:00:00 2001 -From: Yuan Zhou <yuan.zhou@intel.com> -Date: Thu, 19 Apr 2018 22:54:36 +0800 -Subject: [PATCH 01/10] librbd: shared persistent read-only rbd cache - -This patch introduces introduces RBD shared persistent RO cache which -can provide client-side sharing cache for rbd clone/snapshot case. - -The key componenets are: - -- RBD cache daemon runs on each compute node to control the shared cache state - -- Read-only blocks from parent image(s) are cached in a shared area on - compute node(s) - -- Object level dispatcher inside librbd that can do RPC with cache daemon to - lookup the cache - -- Reads are served from the shared cache until the first COW request - -- Policy to control promotion/evication of the shared cache - -The general IO flow is: - -0) Parent image would register themselfs when initializing - -1) When read request on cloned image flows to parent image, it will check with - the cache daemon if the rarget object is ready - -2) Cache daemon receives the lookup request: - a) if the target object is promoted, daemon will ack with "read_from_cache" - b) if it is not promoted, daemon will check the policy whether to promote: - - if yes, daemon will do the promiton then ack with "read_from_cache" - - if no, daemon will ack with "read_from_rados" - -3) the read reqeust contines to do read from cache/rados based on the ack - -Signed-off-by: Yuan Zhou <yuan.zhou@intel.com> ---- - src/common/options.cc | 8 ++ - src/librbd/CMakeLists.txt | 4 +- - src/librbd/ImageCtx.cc | 5 +- - src/librbd/ImageCtx.h | 3 + - src/librbd/cache/SharedPersistentObjectCacher.cc | 61 ++++++++ - src/librbd/cache/SharedPersistentObjectCacher.h | 45 ++++++ - .../SharedPersistentObjectCacherObjectDispatch.cc | 154 +++++++++++++++++++++ - .../SharedPersistentObjectCacherObjectDispatch.h | 127 +++++++++++++++++ - src/librbd/image/OpenRequest.cc | 12 +- - src/librbd/io/Types.h | 1 + - src/os/CacheStore/SyncFile.cc | 110 +++++++++++++++ - src/os/CacheStore/SyncFile.h | 74 ++++++++++ - src/test/librbd/test_mirroring.cc | 1 + - src/test/rbd_mirror/test_ImageReplayer.cc | 2 + - src/test/rbd_mirror/test_fixture.cc | 1 + - src/tools/CMakeLists.txt | 1 + - src/tools/rbd_cache/CMakeLists.txt | 9 ++ - src/tools/rbd_cache/CacheController.cc | 105 ++++++++++++++ - src/tools/rbd_cache/CacheController.hpp | 49 +++++++ - src/tools/rbd_cache/CacheControllerSocket.hpp | 125 +++++++++++++++++ - .../rbd_cache/CacheControllerSocketClient.hpp | 131 ++++++++++++++++++ - src/tools/rbd_cache/CacheControllerSocketCommon.h | 43 ++++++ - src/tools/rbd_cache/ObjectCacheStore.cc | 147 ++++++++++++++++++++ - src/tools/rbd_cache/ObjectCacheStore.h | 65 +++++++++ - src/tools/rbd_cache/main.cc | 85 ++++++++++++ - 25 files changed, 1365 insertions(+), 3 deletions(-) - create mode 100644 src/librbd/cache/SharedPersistentObjectCacher.cc - create mode 100644 src/librbd/cache/SharedPersistentObjectCacher.h - create mode 100644 src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc - create mode 100644 src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.h - create mode 100644 src/os/CacheStore/SyncFile.cc - create mode 100644 src/os/CacheStore/SyncFile.h - create mode 100644 src/tools/rbd_cache/CMakeLists.txt - create mode 100644 src/tools/rbd_cache/CacheController.cc - create mode 100644 src/tools/rbd_cache/CacheController.hpp - create mode 100644 src/tools/rbd_cache/CacheControllerSocket.hpp - create mode 100644 src/tools/rbd_cache/CacheControllerSocketClient.hpp - create mode 100644 src/tools/rbd_cache/CacheControllerSocketCommon.h - create mode 100644 src/tools/rbd_cache/ObjectCacheStore.cc - create mode 100644 src/tools/rbd_cache/ObjectCacheStore.h - create mode 100644 src/tools/rbd_cache/main.cc - -diff --git a/src/common/options.cc b/src/common/options.cc -index c5afe4c..7839a31 100644 ---- a/src/common/options.cc -+++ b/src/common/options.cc -@@ -6357,6 +6357,14 @@ static std::vector<Option> get_rbd_options() { - .set_default(60) - .set_description("time in seconds for detecting a hung thread"), - -+ Option("rbd_shared_cache_enabled", Option::TYPE_BOOL, Option::LEVEL_ADVANCED) -+ .set_default(true) -+ .set_description("whether to enable shared ssd caching"), -+ -+ Option("rbd_shared_cache_path", Option::TYPE_STR, Option::LEVEL_ADVANCED) -+ .set_default("/tmp") -+ .set_description("shared ssd caching data dir"), -+ - Option("rbd_non_blocking_aio", Option::TYPE_BOOL, Option::LEVEL_ADVANCED) - .set_default(true) - .set_description("process AIO ops from a dispatch thread to prevent blocking"), -diff --git a/src/librbd/CMakeLists.txt b/src/librbd/CMakeLists.txt -index b9c08d4..92539a8 100644 ---- a/src/librbd/CMakeLists.txt -+++ b/src/librbd/CMakeLists.txt -@@ -32,7 +32,8 @@ set(librbd_internal_srcs - api/Snapshot.cc - cache/ImageWriteback.cc - cache/ObjectCacherObjectDispatch.cc -- cache/PassthroughImageCache.cc -+ cache/SharedPersistentObjectCacherObjectDispatch.cc -+ cache/SharedPersistentObjectCacher.cc - deep_copy/ImageCopyRequest.cc - deep_copy/MetadataCopyRequest.cc - deep_copy/ObjectCopyRequest.cc -@@ -123,6 +124,7 @@ set(librbd_internal_srcs - trash/MoveRequest.cc - watcher/Notifier.cc - watcher/RewatchRequest.cc -+ ${CMAKE_SOURCE_DIR}/src/os/CacheStore/SyncFile.cc - ${CMAKE_SOURCE_DIR}/src/common/ContextCompletion.cc) - - add_library(rbd_api STATIC librbd.cc) -diff --git a/src/librbd/ImageCtx.cc b/src/librbd/ImageCtx.cc -index 48f98b1..349156b 100644 ---- a/src/librbd/ImageCtx.cc -+++ b/src/librbd/ImageCtx.cc -@@ -776,7 +776,8 @@ public: - "rbd_qos_read_iops_limit", false)( - "rbd_qos_write_iops_limit", false)( - "rbd_qos_read_bps_limit", false)( -- "rbd_qos_write_bps_limit", false); -+ "rbd_qos_write_bps_limit", false)( -+ "rbd_shared_cache_enabled", false); - - ConfigProxy local_config_t{false}; - std::map<std::string, bufferlist> res; -@@ -844,6 +845,8 @@ public: - ASSIGN_OPTION(qos_write_iops_limit, uint64_t); - ASSIGN_OPTION(qos_read_bps_limit, uint64_t); - ASSIGN_OPTION(qos_write_bps_limit, uint64_t); -+ ASSIGN_OPTION(shared_cache_enabled, bool); -+ ASSIGN_OPTION(shared_cache_path, std::string); - - if (thread_safe) { - ASSIGN_OPTION(journal_pool, std::string); -diff --git a/src/librbd/ImageCtx.h b/src/librbd/ImageCtx.h -index d197c24..f661c09 100644 ---- a/src/librbd/ImageCtx.h -+++ b/src/librbd/ImageCtx.h -@@ -204,6 +204,9 @@ namespace librbd { - uint64_t qos_read_bps_limit; - uint64_t qos_write_bps_limit; - -+ bool shared_cache_enabled; -+ std::string shared_cache_path; -+ - LibrbdAdminSocketHook *asok_hook; - - exclusive_lock::Policy *exclusive_lock_policy = nullptr; -diff --git a/src/librbd/cache/SharedPersistentObjectCacher.cc b/src/librbd/cache/SharedPersistentObjectCacher.cc -new file mode 100644 -index 0000000..a849260 ---- /dev/null -+++ b/src/librbd/cache/SharedPersistentObjectCacher.cc -@@ -0,0 +1,61 @@ -+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -+// vim: ts=8 sw=2 smarttab -+ -+#include "librbd/cache/SharedPersistentObjectCacher.h" -+#include "include/buffer.h" -+#include "common/dout.h" -+#include "librbd/ImageCtx.h" -+ -+#define dout_subsys ceph_subsys_rbd -+#undef dout_prefix -+#define dout_prefix *_dout << "librbd::cache::SharedPersistentObjectCacher: " << this \ -+ << " " << __func__ << ": " -+ -+namespace librbd { -+namespace cache { -+ -+template <typename I> -+SharedPersistentObjectCacher<I>::SharedPersistentObjectCacher(I *image_ctx, std::string cache_path) -+ : m_image_ctx(image_ctx), m_cache_path(cache_path), -+ m_file_map_lock("librbd::cache::SharedObjectCacher::filemaplock") { -+ auto *cct = m_image_ctx->cct; -+ -+} -+ -+template <typename I> -+SharedPersistentObjectCacher<I>::~SharedPersistentObjectCacher() { -+ for(auto &it: file_map) { -+ if(it.second) { -+ delete it.second; -+ } -+ } -+} -+ -+template <typename I> -+int SharedPersistentObjectCacher<I>::read_object(std::string oid, ceph::bufferlist* read_data, uint64_t offset, uint64_t length, Context *on_finish) { -+ -+ auto *cct = m_image_ctx->cct; -+ ldout(cct, 20) << "object: " << oid << dendl; -+ -+ std::string cache_file_name = m_image_ctx->data_ctx.get_pool_name() + oid; -+ -+ //TODO(): make a cache for cachefile fd -+ os::CacheStore::SyncFile* target_cache_file = new os::CacheStore::SyncFile(cct, cache_file_name); -+ target_cache_file->open(); -+ -+ int ret = target_cache_file->read_object_from_file(read_data, offset, length); -+ if (ret < 0) { -+ ldout(cct, 5) << "read from file return error: " << ret -+ << "file name= " << cache_file_name -+ << dendl; -+ } -+ -+ delete target_cache_file; -+ return ret; -+} -+ -+ -+} // namespace cache -+} // namespace librbd -+ -+template class librbd::cache::SharedPersistentObjectCacher<librbd::ImageCtx>; -diff --git a/src/librbd/cache/SharedPersistentObjectCacher.h b/src/librbd/cache/SharedPersistentObjectCacher.h -new file mode 100644 -index 0000000..d108a05 ---- /dev/null -+++ b/src/librbd/cache/SharedPersistentObjectCacher.h -@@ -0,0 +1,45 @@ -+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -+// vim: ts=8 sw=2 smarttab -+ -+#ifndef CEPH_LIBRBD_CACHE_SHARED_PERSISTENT_OBJECT_CACHER -+#define CEPH_LIBRBD_CACHE_SHARED_PERSISTENT_OBJECT_CACHER -+ -+#include "include/buffer_fwd.h" -+#include "include/int_types.h" -+#include "os/CacheStore/SyncFile.h" -+#include "common/Mutex.h" -+#include <vector> -+#include <map> -+ -+struct Context; -+ -+namespace librbd { -+ -+struct ImageCtx; -+ -+namespace cache { -+ -+template <typename ImageCtxT> -+class SharedPersistentObjectCacher { -+public: -+ -+ SharedPersistentObjectCacher(ImageCtxT *image_ctx, std::string cache_path); -+ ~SharedPersistentObjectCacher(); -+ -+ int read_object(std::string oid, ceph::bufferlist* read_data, -+ uint64_t offset, uint64_t length, Context *on_finish); -+ -+private: -+ ImageCtxT *m_image_ctx; -+ std::map<std::string, os::CacheStore::SyncFile*> file_map; -+ Mutex m_file_map_lock; -+ std::string m_cache_path; -+ -+}; -+ -+} // namespace cache -+} // namespace librbd -+ -+extern template class librbd::cache::SharedPersistentObjectCacher<librbd::ImageCtx>; -+ -+#endif // CEPH_LIBRBD_CACHE_FILE_IMAGE_STORE -diff --git a/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc b/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc -new file mode 100644 -index 0000000..90d886c ---- /dev/null -+++ b/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc -@@ -0,0 +1,154 @@ -+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -+// vim: ts=8 sw=2 smarttab -+ -+#include "librbd/cache/SharedPersistentObjectCacherObjectDispatch.h" -+#include "common/WorkQueue.h" -+#include "librbd/ImageCtx.h" -+#include "librbd/Journal.h" -+#include "librbd/Utils.h" -+#include "librbd/LibrbdWriteback.h" -+#include "librbd/io/ObjectDispatchSpec.h" -+#include "librbd/io/ObjectDispatcher.h" -+#include "librbd/io/Utils.h" -+#include "osd/osd_types.h" -+#include "osdc/WritebackHandler.h" -+#include <vector> -+ -+#define dout_subsys ceph_subsys_rbd -+#undef dout_prefix -+#define dout_prefix *_dout << "librbd::cache::SharedPersistentObjectCacherObjectDispatch: " \ -+ << this << " " << __func__ << ": " -+ -+namespace librbd { -+namespace cache { -+ -+template <typename I> -+SharedPersistentObjectCacherObjectDispatch<I>::SharedPersistentObjectCacherObjectDispatch( -+ I* image_ctx) : m_image_ctx(image_ctx) { -+} -+ -+template <typename I> -+SharedPersistentObjectCacherObjectDispatch<I>::~SharedPersistentObjectCacherObjectDispatch() { -+ if (m_object_store) { -+ delete m_object_store; -+ } -+ -+ if (m_cache_client) { -+ delete m_cache_client; -+ } -+} -+ -+template <typename I> -+void SharedPersistentObjectCacherObjectDispatch<I>::init() { -+ auto cct = m_image_ctx->cct; -+ ldout(cct, 5) << dendl; -+ -+ if (m_image_ctx->parent != nullptr) { -+ //TODO(): should we cover multi-leveled clone? -+ ldout(cct, 5) << "child image: skipping SRO cache client" << dendl; -+ return; -+ } -+ -+ ldout(cct, 20) << "parent image: setup SRO cache client = " << dendl; -+ -+ std::string controller_path = "/tmp/rbd_shared_readonly_cache_demo"; -+ m_cache_client = new CacheClient(io_service, controller_path.c_str(), -+ ([&](std::string s){client_handle_request(s);})); -+ -+ int ret = m_cache_client->connect(); -+ if (ret < 0) { -+ ldout(cct, 5) << "SRO cache client fail to connect with local controller: " -+ << "please start rbd-cache daemon" -+ << dendl; -+ } else { -+ ldout(cct, 5) << "SRO cache client to register volume on rbd-cache daemon: " -+ << "name = " << m_image_ctx->id -+ << dendl; -+ -+ ret = m_cache_client->register_volume(m_image_ctx->data_ctx.get_pool_name(), -+ m_image_ctx->id, m_image_ctx->size); -+ -+ if (ret >= 0) { -+ // add ourself to the IO object dispatcher chain -+ m_image_ctx->io_object_dispatcher->register_object_dispatch(this); -+ } -+ } -+} -+ -+template <typename I> -+bool SharedPersistentObjectCacherObjectDispatch<I>::read( -+ const std::string &oid, uint64_t object_no, uint64_t object_off, -+ uint64_t object_len, librados::snap_t snap_id, int op_flags, -+ const ZTracer::Trace &parent_trace, ceph::bufferlist* read_data, -+ io::ExtentMap* extent_map, int* object_dispatch_flags, -+ io::DispatchResult* dispatch_result, Context** on_finish, -+ Context* on_dispatched) { -+ // IO chained in reverse order -+ auto cct = m_image_ctx->cct; -+ ldout(cct, 20) << "object_no=" << object_no << " " << object_off << "~" -+ << object_len << dendl; -+ -+ // ensure we aren't holding the cache lock post-read -+ on_dispatched = util::create_async_context_callback(*m_image_ctx, -+ on_dispatched); -+ -+ if (m_cache_client && m_cache_client->connected && m_object_store) { -+ bool exists; -+ m_cache_client->lookup_object(m_image_ctx->data_ctx.get_pool_name(), -+ m_image_ctx->id, oid, &exists); -+ -+ // try to read from parent image -+ ldout(cct, 20) << "SRO cache object exists:" << exists << dendl; -+ if (exists) { -+ int r = m_object_store->read_object(oid, read_data, object_off, object_len, on_dispatched); -+ if (r != 0) { -+ *dispatch_result = io::DISPATCH_RESULT_COMPLETE; -+ on_dispatched->complete(r); -+ return true; -+ } -+ } -+ } -+ -+ ldout(cct, 20) << "Continue read from RADOS" << dendl; -+ *dispatch_result = io::DISPATCH_RESULT_CONTINUE; -+ on_dispatched->complete(0); -+ return true; -+} -+ -+template <typename I> -+void SharedPersistentObjectCacherObjectDispatch<I>::client_handle_request(std::string msg) { -+ auto cct = m_image_ctx->cct; -+ ldout(cct, 20) << dendl; -+ -+ rbdsc_req_type_t *io_ctx = (rbdsc_req_type_t*)(msg.c_str()); -+ -+ switch (io_ctx->type) { -+ case RBDSC_REGISTER_REPLY: { -+ // open cache handler for volume -+ ldout(cct, 20) << "SRO cache client open cache handler" << dendl; -+ m_object_store = new SharedPersistentObjectCacher<I>(m_image_ctx, m_image_ctx->shared_cache_path); -+ -+ break; -+ } -+ case RBDSC_READ_REPLY: { -+ ldout(cct, 20) << "SRO cache client start to read cache" << dendl; -+ //TODO(): should call read here -+ -+ break; -+ } -+ case RBDSC_READ_RADOS: { -+ ldout(cct, 20) << "SRO cache client start to read rados" << dendl; -+ //TODO(): should call read here -+ -+ break; -+ } -+ default: ldout(cct, 20) << "nothing" << dendl; -+ break; -+ -+ } -+} -+ -+} // namespace cache -+} // namespace librbd -+ -+template class librbd::cache::SharedPersistentObjectCacherObjectDispatch<librbd::ImageCtx>; -diff --git a/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.h b/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.h -new file mode 100644 -index 0000000..1ede804 ---- /dev/null -+++ b/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.h -@@ -0,0 +1,127 @@ -+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -+// vim: ts=8 sw=2 smarttab -+ -+#ifndef CEPH_LIBRBD_CACHE_SHARED_PERSISTENT_OBJECT_CACHER_OBJECT_DISPATCH_H -+#define CEPH_LIBRBD_CACHE_SHARED_PERSISTENT_OBJECT_CACHER_OBJECT_DISPATCH_H -+ -+#include "librbd/io/ObjectDispatchInterface.h" -+#include "common/Mutex.h" -+#include "osdc/ObjectCacher.h" -+#include "tools/rbd_cache/CacheControllerSocketClient.hpp" -+#include "SharedPersistentObjectCacher.h" -+ -+struct WritebackHandler; -+ -+namespace librbd { -+ -+class ImageCtx; -+ -+namespace cache { -+ -+/** -+ * Facade around the OSDC object cacher to make it align with -+ * the object dispatcher interface -+ */ -+template <typename ImageCtxT = ImageCtx> -+class SharedPersistentObjectCacherObjectDispatch : public io::ObjectDispatchInterface { -+public: -+ static SharedPersistentObjectCacherObjectDispatch* create(ImageCtxT* image_ctx) { -+ return new SharedPersistentObjectCacherObjectDispatch(image_ctx); -+ } -+ -+ SharedPersistentObjectCacherObjectDispatch(ImageCtxT* image_ctx); -+ ~SharedPersistentObjectCacherObjectDispatch() override; -+ -+ io::ObjectDispatchLayer get_object_dispatch_layer() const override { -+ return io::OBJECT_DISPATCH_LAYER_SHARED_PERSISTENT_CACHE; -+ } -+ -+ void init(); -+ void shut_down(Context* on_finish) { -+ m_image_ctx->op_work_queue->queue(on_finish, 0); -+ } -+ -+ bool read( -+ const std::string &oid, uint64_t object_no, uint64_t object_off, -+ uint64_t object_len, librados::snap_t snap_id, int op_flags, -+ const ZTracer::Trace &parent_trace, ceph::bufferlist* read_data, -+ io::ExtentMap* extent_map, int* object_dispatch_flags, -+ io::DispatchResult* dispatch_result, Context** on_finish, -+ Context* on_dispatched) override; -+ -+ bool discard( -+ const std::string &oid, uint64_t object_no, uint64_t object_off, -+ uint64_t object_len, const ::SnapContext &snapc, int discard_flags, -+ const ZTracer::Trace &parent_trace, int* object_dispatch_flags, -+ uint64_t* journal_tid, io::DispatchResult* dispatch_result, -+ Context** on_finish, Context* on_dispatched) { -+ return false; -+ } -+ -+ bool write( -+ const std::string &oid, uint64_t object_no, uint64_t object_off, -+ ceph::bufferlist&& data, const ::SnapContext &snapc, int op_flags, -+ const ZTracer::Trace &parent_trace, int* object_dispatch_flags, -+ uint64_t* journal_tid, io::DispatchResult* dispatch_result, -+ Context** on_finish, Context* on_dispatched) { -+ return false; -+ } -+ -+ bool write_same( -+ const std::string &oid, uint64_t object_no, uint64_t object_off, -+ uint64_t object_len, io::Extents&& buffer_extents, -+ ceph::bufferlist&& data, const ::SnapContext &snapc, int op_flags, -+ const ZTracer::Trace &parent_trace, int* object_dispatch_flags, -+ uint64_t* journal_tid, io::DispatchResult* dispatch_result, -+ Context** on_finish, Context* on_dispatched) { -+ return false; -+ } -+ -+ bool compare_and_write( -+ const std::string &oid, uint64_t object_no, uint64_t object_off, -+ ceph::bufferlist&& cmp_data, ceph::bufferlist&& write_data, -+ const ::SnapContext &snapc, int op_flags, -+ const ZTracer::Trace &parent_trace, uint64_t* mismatch_offset, -+ int* object_dispatch_flags, uint64_t* journal_tid, -+ io::DispatchResult* dispatch_result, Context** on_finish, -+ Context* on_dispatched) { -+ return false; -+ } -+ -+ bool flush( -+ io::FlushSource flush_source, const ZTracer::Trace &parent_trace, -+ io::DispatchResult* dispatch_result, Context** on_finish, -+ Context* on_dispatched) { -+ return false; -+ } -+ -+ bool invalidate_cache(Context* on_finish) { -+ return false; -+ } -+ -+ bool reset_existence_cache(Context* on_finish) { -+ return false; -+ } -+ -+ void extent_overwritten( -+ uint64_t object_no, uint64_t object_off, uint64_t object_len, -+ uint64_t journal_tid, uint64_t new_journal_tid) { -+ } -+ -+ SharedPersistentObjectCacher<ImageCtxT> *m_object_store = nullptr; -+ -+private: -+ -+ ImageCtxT* m_image_ctx; -+ -+ void client_handle_request(std::string msg); -+ CacheClient *m_cache_client = nullptr; -+ boost::asio::io_service io_service; -+}; -+ -+} // namespace cache -+} // namespace librbd -+ -+extern template class librbd::cache::SharedPersistentObjectCacherObjectDispatch<librbd::ImageCtx>; -+ -+#endif // CEPH_LIBRBD_CACHE_OBJECT_CACHER_OBJECT_DISPATCH_H -diff --git a/src/librbd/image/OpenRequest.cc b/src/librbd/image/OpenRequest.cc -index ae18739..30a7b66 100644 ---- a/src/librbd/image/OpenRequest.cc -+++ b/src/librbd/image/OpenRequest.cc -@@ -8,6 +8,7 @@ - #include "librbd/ImageCtx.h" - #include "librbd/Utils.h" - #include "librbd/cache/ObjectCacherObjectDispatch.h" -+#include "librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc" - #include "librbd/image/CloseRequest.h" - #include "librbd/image/RefreshRequest.h" - #include "librbd/image/SetSnapRequest.h" -@@ -448,12 +449,21 @@ Context *OpenRequest<I>::handle_refresh(int *result) { - - template <typename I> - Context *OpenRequest<I>::send_init_cache(int *result) { -+ -+ CephContext *cct = m_image_ctx->cct; - // cache is disabled or parent image context - if (!m_image_ctx->cache || m_image_ctx->child != nullptr) { -+ -+ // enable Shared Read-only cache for parent image -+ if (m_image_ctx->child != nullptr && m_image_ctx->shared_cache_enabled ) { -+ ldout(cct, 10) << this << " " << "setting up parent cache"<< dendl; -+ auto sro_cache = cache::SharedPersistentObjectCacherObjectDispatch<I>::create(m_image_ctx); -+ sro_cache->init(); -+ } -+ - return send_register_watch(result); - } - -- CephContext *cct = m_image_ctx->cct; - ldout(cct, 10) << this << " " << __func__ << dendl; - - auto cache = cache::ObjectCacherObjectDispatch<I>::create(m_image_ctx); -diff --git a/src/librbd/io/Types.h b/src/librbd/io/Types.h -index 7e09c90..ef3049f 100644 ---- a/src/librbd/io/Types.h -+++ b/src/librbd/io/Types.h -@@ -59,6 +59,7 @@ enum DispatchResult { - enum ObjectDispatchLayer { - OBJECT_DISPATCH_LAYER_NONE = 0, - OBJECT_DISPATCH_LAYER_CACHE, -+ OBJECT_DISPATCH_LAYER_SHARED_PERSISTENT_CACHE, - OBJECT_DISPATCH_LAYER_JOURNAL, - OBJECT_DISPATCH_LAYER_CORE, - OBJECT_DISPATCH_LAYER_LAST -diff --git a/src/os/CacheStore/SyncFile.cc b/src/os/CacheStore/SyncFile.cc -new file mode 100644 -index 0000000..5352bde ---- /dev/null -+++ b/src/os/CacheStore/SyncFile.cc -@@ -0,0 +1,110 @@ -+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -+// vim: ts=8 sw=2 smarttab -+ -+#include "os/CacheStore/SyncFile.h" -+#include "include/Context.h" -+#include "common/dout.h" -+#include "common/WorkQueue.h" -+#include "librbd/ImageCtx.h" -+#include <sys/types.h> -+#include <sys/stat.h> -+#include <aio.h> -+#include <errno.h> -+#include <fcntl.h> -+#include <utility> -+ -+#define dout_subsys ceph_subsys_rbd -+#undef dout_prefix -+#define dout_prefix *_dout << "librbd::file::SyncFile: " << this << " " \ -+ << __func__ << ": " -+ -+namespace os { -+namespace CacheStore { -+ -+SyncFile::SyncFile(CephContext *cct, const std::string &name) -+ : cct(cct) -+{ -+ m_name = cct->_conf->get_val<std::string>("rbd_shared_cache_path") + "/rbd_cache." + name; -+ ldout(cct, 20) << "file path=" << m_name << dendl; -+} -+ -+SyncFile::~SyncFile() -+{ -+ // TODO force proper cleanup -+ if (m_fd != -1) { -+ ::close(m_fd); -+ } -+} -+ -+void SyncFile::open(Context *on_finish) -+{ -+ while (true) { -+ m_fd = ::open(m_name.c_str(), O_CREAT | O_DIRECT | O_NOATIME | O_RDWR | O_SYNC, -+ S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP); -+ if (m_fd == -1) { -+ int r = -errno; -+ if (r == -EINTR) { -+ continue; -+ } -+ on_finish->complete(r); -+ return; -+ } -+ break; -+ } -+ -+ on_finish->complete(0); -+} -+ -+void SyncFile::open() -+{ -+ while (true) -+ { -+ m_fd = ::open(m_name.c_str(), O_CREAT | O_NOATIME | O_RDWR | O_SYNC, -+ S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP); -+ if (m_fd == -1) -+ { -+ int r = -errno; -+ if (r == -EINTR) { -+ continue; -+ } -+ return; -+ } -+ break; -+ } -+} -+ -+int SyncFile::write_object_to_file(ceph::bufferlist read_buf, uint64_t object_len) { -+ -+ ldout(cct, 20) << "cache file name:" << m_name -+ << ", length:" << object_len << dendl; -+ -+ // TODO(): aio -+ int ret = pwrite(m_fd, read_buf.c_str(), object_len, 0); -+ if(ret < 0) { -+ lderr(cct)<<"write file fail:" << std::strerror(errno) << dendl; -+ return ret; -+ } -+ -+ return ret; -+} -+ -+int SyncFile::read_object_from_file(ceph::bufferlist* read_buf, uint64_t object_off, uint64_t object_len) { -+ -+ ldout(cct, 20) << "offset:" << object_off -+ << ", length:" << object_len << dendl; -+ -+ bufferptr buf(object_len); -+ -+ // TODO(): aio -+ int ret = pread(m_fd, buf.c_str(), object_len, object_off); -+ if(ret < 0) { -+ lderr(cct)<<"read file fail:" << std::strerror(errno) << dendl; -+ return ret; -+ } -+ read_buf->append(std::move(buf)); -+ -+ return ret; -+} -+ -+} // namespace CacheStore -+} // namespace os -diff --git a/src/os/CacheStore/SyncFile.h b/src/os/CacheStore/SyncFile.h -new file mode 100644 -index 0000000..81602ce ---- /dev/null -+++ b/src/os/CacheStore/SyncFile.h -@@ -0,0 +1,74 @@ -+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -+// vim: ts=8 sw=2 smarttab -+ -+#ifndef CEPH_LIBOS_CACHE_STORE_SYNC_FILE -+#define CEPH_LIBOS_CACHE_STORE_SYNC_FILE -+ -+#include "include/buffer_fwd.h" -+#include <sys/mman.h> -+#include <string> -+ -+struct Context; -+struct ContextWQ; -+class CephContext; -+ -+namespace os { -+ -+namespace CacheStore { -+ -+class SyncFile { -+public: -+ SyncFile(CephContext *cct, const std::string &name); -+ ~SyncFile(); -+ -+ // TODO use IO queue instead of individual commands so operations can be -+ // submitted in batch -+ -+ // TODO use scatter/gather API -+ -+ void open(Context *on_finish); -+ -+ // ## -+ void open(); -+ bool try_open(); -+ void close(Context *on_finish); -+ void remove(Context *on_finish); -+ -+ void read(uint64_t offset, uint64_t length, ceph::bufferlist *bl, Context *on_finish); -+ -+ void write(uint64_t offset, ceph::bufferlist &&bl, bool fdatasync, Context *on_finish); -+ -+ void discard(uint64_t offset, uint64_t length, bool fdatasync, Context *on_finish); -+ -+ void truncate(uint64_t length, bool fdatasync, Context *on_finish); -+ -+ void fsync(Context *on_finish); -+ -+ void fdatasync(Context *on_finish); -+ -+ uint64_t filesize(); -+ -+ int load(void** dest, uint64_t filesize); -+ -+ int remove(); -+ -+ // ## -+ int write_object_to_file(ceph::bufferlist read_buf, uint64_t object_len); -+ int read_object_from_file(ceph::bufferlist* read_buf, uint64_t object_off, uint64_t object_len); -+ -+private: -+ CephContext *cct; -+ std::string m_name; -+ int m_fd = -1; -+ -+ int write(uint64_t offset, const ceph::bufferlist &bl, bool fdatasync); -+ int read(uint64_t offset, uint64_t length, ceph::bufferlist *bl); -+ int discard(uint64_t offset, uint64_t length, bool fdatasync); -+ int truncate(uint64_t length, bool fdatasync); -+ int fdatasync(); -+}; -+ -+} // namespace CacheStore -+} // namespace os -+ -+#endif // CEPH_LIBOS_CACHE_STORE_SYNC_FILE -diff --git a/src/test/librbd/test_mirroring.cc b/src/test/librbd/test_mirroring.cc -index b4fdeae..d7d1aa6 100644 ---- a/src/test/librbd/test_mirroring.cc -+++ b/src/test/librbd/test_mirroring.cc -@@ -47,6 +47,7 @@ public: - - void SetUp() override { - ASSERT_EQ(0, _rados.ioctx_create(_pool_name.c_str(), m_ioctx)); -+ ASSERT_EQ(0, _rados.conf_set("rbd_shared_cache_enabled", "false")); - } - - std::string image_name = "mirrorimg1"; -diff --git a/src/test/rbd_mirror/test_ImageReplayer.cc b/src/test/rbd_mirror/test_ImageReplayer.cc -index 8a95a65..b5598bd 100644 ---- a/src/test/rbd_mirror/test_ImageReplayer.cc -+++ b/src/test/rbd_mirror/test_ImageReplayer.cc -@@ -90,6 +90,7 @@ public: - EXPECT_EQ("", connect_cluster_pp(*m_local_cluster.get())); - EXPECT_EQ(0, m_local_cluster->conf_set("rbd_cache", "false")); - EXPECT_EQ(0, m_local_cluster->conf_set("rbd_mirror_journal_poll_age", "1")); -+ EXPECT_EQ(0, m_local_cluster->conf_set("rbd_shared_cache_enabled", "false")); - - m_local_pool_name = get_temp_pool_name(); - EXPECT_EQ(0, m_local_cluster->pool_create(m_local_pool_name.c_str())); -@@ -99,6 +100,7 @@ public: - - EXPECT_EQ("", connect_cluster_pp(m_remote_cluster)); - EXPECT_EQ(0, m_remote_cluster.conf_set("rbd_cache", "false")); -+ EXPECT_EQ(0, m_remote_cluster.conf_set("rbd_shared_cache_enabled", "false")); - - m_remote_pool_name = get_temp_pool_name(); - EXPECT_EQ(0, m_remote_cluster.pool_create(m_remote_pool_name.c_str())); -diff --git a/src/test/rbd_mirror/test_fixture.cc b/src/test/rbd_mirror/test_fixture.cc -index b2a51ca..9e77098 100644 ---- a/src/test/rbd_mirror/test_fixture.cc -+++ b/src/test/rbd_mirror/test_fixture.cc -@@ -27,6 +27,7 @@ void TestFixture::SetUpTestCase() { - _rados = std::shared_ptr<librados::Rados>(new librados::Rados()); - ASSERT_EQ("", connect_cluster_pp(*_rados.get())); - ASSERT_EQ(0, _rados->conf_set("rbd_cache", "false")); -+ ASSERT_EQ(0, _rados->conf_set("rbd_shared_cache_enabled", "false")); - - _local_pool_name = get_temp_pool_name("test-rbd-mirror-"); - ASSERT_EQ(0, _rados->pool_create(_local_pool_name.c_str())); -diff --git a/src/tools/CMakeLists.txt b/src/tools/CMakeLists.txt -index 3789e3c..72ab342 100644 ---- a/src/tools/CMakeLists.txt -+++ b/src/tools/CMakeLists.txt -@@ -99,6 +99,7 @@ endif(WITH_CEPHFS) - if(WITH_RBD) - add_subdirectory(rbd) - add_subdirectory(rbd_mirror) -+ add_subdirectory(rbd_cache) - if(LINUX) - add_subdirectory(rbd_nbd) - endif() -diff --git a/src/tools/rbd_cache/CMakeLists.txt b/src/tools/rbd_cache/CMakeLists.txt -new file mode 100644 -index 0000000..08eae60 ---- /dev/null -+++ b/src/tools/rbd_cache/CMakeLists.txt -@@ -0,0 +1,9 @@ -+add_executable(rbd-cache -+ ${CMAKE_SOURCE_DIR}/src/os/CacheStore/SyncFile.cc -+ ObjectCacheStore.cc -+ CacheController.cc -+ main.cc) -+target_link_libraries(rbd-cache -+ librados -+ global) -+install(TARGETS rbd-cache DESTINATION bin) -diff --git a/src/tools/rbd_cache/CacheController.cc b/src/tools/rbd_cache/CacheController.cc -new file mode 100644 -index 0000000..c914358 ---- /dev/null -+++ b/src/tools/rbd_cache/CacheController.cc -@@ -0,0 +1,105 @@ -+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -+// vim: ts=8 sw=2 smarttab -+ -+#include "CacheController.hpp" -+ -+#define dout_context g_ceph_context -+#define dout_subsys ceph_subsys_rbd_cache -+#undef dout_prefix -+#define dout_prefix *_dout << "rbd::cache::CacheController: " << this << " " \ -+ << __func__ << ": " -+ -+ -+class ThreadPoolSingleton : public ThreadPool { -+public: -+ ContextWQ *op_work_queue; -+ -+ explicit ThreadPoolSingleton(CephContext *cct) -+ : ThreadPool(cct, "librbd::cache::thread_pool", "tp_librbd_cache", 32, -+ "pcache_threads"), -+ op_work_queue(new ContextWQ("librbd::pcache_op_work_queue", -+ cct->_conf->get_val<int64_t>("rbd_op_thread_timeout"), -+ this)) { -+ start(); -+ } -+ ~ThreadPoolSingleton() override { -+ op_work_queue->drain(); -+ delete op_work_queue; -+ -+ stop(); -+ } -+}; -+ -+ -+CacheController::CacheController(CephContext *cct, const std::vector<const char*> &args): -+ m_args(args), m_cct(cct) { -+ -+} -+ -+CacheController::~CacheController() { -+ -+} -+ -+int CacheController::init() { -+ ThreadPoolSingleton* thread_pool_singleton = &m_cct->lookup_or_create_singleton_object<ThreadPoolSingleton>( -+ "rbd::cache::thread_pool", false, m_cct); -+ pcache_op_work_queue = thread_pool_singleton->op_work_queue; -+ -+ m_object_cache_store = new ObjectCacheStore(m_cct, pcache_op_work_queue); -+ int r = m_object_cache_store->init(false); -+ if (r < 0) { -+ //derr << "init error\n" << dendl; -+ } -+ return r; -+} -+ -+int CacheController::shutdown() { -+ int r = m_object_cache_store->shutdown(); -+ return r; -+} -+ -+void CacheController::handle_signal(int signum){} -+ -+void CacheController::run() { -+ try { -+ //TODO(): use new socket path -+ std::string controller_path = "/tmp/rbd_shared_readonly_cache_demo"; -+ std::remove(controller_path.c_str()); -+ -+ m_cache_server = new CacheServer(io_service, controller_path, -+ ([&](uint64_t p, std::string s){handle_request(p, s);})); -+ io_service.run(); -+ } catch (std::exception& e) { -+ std::cerr << "Exception: " << e.what() << "\n"; -+ } -+} -+ -+void CacheController::handle_request(uint64_t sesstion_id, std::string msg){ -+ rbdsc_req_type_t *io_ctx = (rbdsc_req_type_t*)(msg.c_str()); -+ -+ int ret = 0; -+ -+ switch (io_ctx->type) { -+ case RBDSC_REGISTER: { -+ // init cache layout for volume -+ m_object_cache_store->init_cache(io_ctx->vol_name, io_ctx->vol_size); -+ io_ctx->type = RBDSC_REGISTER_REPLY; -+ m_cache_server->send(sesstion_id, std::string((char*)io_ctx, msg.size())); -+ -+ break; -+ } -+ case RBDSC_READ: { -+ // lookup object in local cache store -+ ret = m_object_cache_store->lookup_object(io_ctx->pool_name, io_ctx->vol_name); -+ if (ret < 0) { -+ io_ctx->type = RBDSC_READ_RADOS; -+ } else { -+ io_ctx->type = RBDSC_READ_REPLY; -+ } -+ m_cache_server->send(sesstion_id, std::string((char*)io_ctx, msg.size())); -+ -+ break; -+ } -+ -+ } -+} -diff --git a/src/tools/rbd_cache/CacheController.hpp b/src/tools/rbd_cache/CacheController.hpp -new file mode 100644 -index 0000000..97113e4 ---- /dev/null -+++ b/src/tools/rbd_cache/CacheController.hpp -@@ -0,0 +1,49 @@ -+#ifndef CACHE_CONTROLLER_H -+#define CACHE_CONTROLLER_H -+ -+#include <thread> -+ -+#include "common/Formatter.h" -+#include "common/admin_socket.h" -+#include "common/debug.h" -+#include "common/errno.h" -+#include "common/ceph_context.h" -+#include "common/Mutex.h" -+#include "common/WorkQueue.h" -+#include "include/rados/librados.hpp" -+#include "include/rbd/librbd.h" -+#include "include/assert.h" -+#include "librbd/ImageCtx.h" -+#include "librbd/ImageState.h" -+ -+#include "CacheControllerSocket.hpp" -+#include "ObjectCacheStore.h" -+ -+ -+using boost::asio::local::stream_protocol; -+ -+class CacheController { -+ public: -+ CacheController(CephContext *cct, const std::vector<const char*> &args); -+ ~CacheController(); -+ -+ int init(); -+ -+ int shutdown(); -+ -+ void handle_signal(int sinnum); -+ -+ void run(); -+ -+ void handle_request(uint64_t sesstion_id, std::string msg); -+ -+ private: -+ boost::asio::io_service io_service; -+ CacheServer *m_cache_server; -+ std::vector<const char*> m_args; -+ CephContext *m_cct; -+ ObjectCacheStore *m_object_cache_store; -+ ContextWQ* pcache_op_work_queue; -+}; -+ -+#endif -diff --git a/src/tools/rbd_cache/CacheControllerSocket.hpp b/src/tools/rbd_cache/CacheControllerSocket.hpp -new file mode 100644 -index 0000000..6e1a743 ---- /dev/null -+++ b/src/tools/rbd_cache/CacheControllerSocket.hpp -@@ -0,0 +1,125 @@ -+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -+// vim: ts=8 sw=2 smarttab -+ -+#ifndef CACHE_CONTROLLER_SOCKET_H -+#define CACHE_CONTROLLER_SOCKET_H -+ -+#include <cstdio> -+#include <iostream> -+#include <array> -+#include <memory> -+#include <string> -+#include <boost/bind.hpp> -+#include <boost/asio.hpp> -+#include <boost/algorithm/string.hpp> -+#include "CacheControllerSocketCommon.h" -+ -+ -+using boost::asio::local::stream_protocol; -+ -+class session : public std::enable_shared_from_this<session> { -+public: -+ session(uint64_t session_id, boost::asio::io_service& io_service, ProcessMsg processmsg) -+ : session_id(session_id), socket_(io_service), process_msg(processmsg) {} -+ -+ stream_protocol::socket& socket() { -+ return socket_; -+ } -+ -+ void start() { -+ -+ boost::asio::async_read(socket_, boost::asio::buffer(data_), -+ boost::asio::transfer_exactly(544), -+ boost::bind(&session::handle_read, -+ shared_from_this(), -+ boost::asio::placeholders::error, -+ boost::asio::placeholders::bytes_transferred)); -+ -+ } -+ -+ void handle_read(const boost::system::error_code& error, size_t bytes_transferred) { -+ -+ if (!error) { -+ -+ process_msg(session_id, std::string(data_, bytes_transferred)); -+ -+ } -+ } -+ -+ void handle_write(const boost::system::error_code& error) { -+ if (!error) { -+ socket_.async_read_some(boost::asio::buffer(data_), -+ boost::bind(&session::handle_read, -+ shared_from_this(), -+ boost::asio::placeholders::error, -+ boost::asio::placeholders::bytes_transferred)); -+ } -+ } -+ -+ void send(std::string msg) { -+ -+ boost::asio::async_write(socket_, -+ boost::asio::buffer(msg.c_str(), msg.size()), -+ boost::bind(&session::handle_write, -+ shared_from_this(), -+ boost::asio::placeholders::error)); -+ -+ } -+ -+private: -+ uint64_t session_id; -+ stream_protocol::socket socket_; -+ ProcessMsg process_msg; -+ -+ // Buffer used to store data received from the client. -+ //std::array<char, 1024> data_; -+ char data_[1024]; -+}; -+ -+typedef std::shared_ptr<session> session_ptr; -+ -+class CacheServer { -+public: -+ CacheServer(boost::asio::io_service& io_service, -+ const std::string& file, ProcessMsg processmsg) -+ : io_service_(io_service), -+ server_process_msg(processmsg), -+ acceptor_(io_service, stream_protocol::endpoint(file)) -+ { -+ session_ptr new_session(new session(session_id, io_service_, server_process_msg)); -+ acceptor_.async_accept(new_session->socket(), -+ boost::bind(&CacheServer::handle_accept, this, new_session, -+ boost::asio::placeholders::error)); -+ } -+ -+ void handle_accept(session_ptr new_session, -+ const boost::system::error_code& error) -+ { -+ //TODO(): open librbd snap -+ if (!error) { -+ new_session->start(); -+ session_map.emplace(session_id, new_session); -+ session_id++; -+ new_session.reset(new session(session_id, io_service_, server_process_msg)); -+ acceptor_.async_accept(new_session->socket(), -+ boost::bind(&CacheServer::handle_accept, this, new_session, -+ boost::asio::placeholders::error)); -+ } -+ } -+ -+ void send(uint64_t session_id, std::string msg) { -+ auto it = session_map.find(session_id); -+ if (it != session_map.end()) { -+ it->second->send(msg); -+ } -+ } -+ -+private: -+ boost::asio::io_service& io_service_; -+ ProcessMsg server_process_msg; -+ stream_protocol::acceptor acceptor_; -+ uint64_t session_id = 1; -+ std::map<uint64_t, session_ptr> session_map; -+}; -+ -+#endif -diff --git a/src/tools/rbd_cache/CacheControllerSocketClient.hpp b/src/tools/rbd_cache/CacheControllerSocketClient.hpp -new file mode 100644 -index 0000000..8e61aa9 ---- /dev/null -+++ b/src/tools/rbd_cache/CacheControllerSocketClient.hpp -@@ -0,0 +1,131 @@ -+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -+// vim: ts=8 sw=2 smarttab -+ -+#ifndef CACHE_CONTROLLER_SOCKET_CLIENT_H -+#define CACHE_CONTROLLER_SOCKET_CLIENT_H -+ -+#include <boost/asio.hpp> -+#include <boost/bind.hpp> -+#include <boost/algorithm/string.hpp> -+#include "include/assert.h" -+#include "CacheControllerSocketCommon.h" -+ -+ -+using boost::asio::local::stream_protocol; -+ -+class CacheClient { -+public: -+ CacheClient(boost::asio::io_service& io_service, -+ const std::string& file, ClientProcessMsg processmsg) -+ : io_service_(io_service), -+ io_service_work_(io_service), -+ socket_(io_service), -+ m_client_process_msg(processmsg), -+ ep_(stream_protocol::endpoint(file)) -+ { -+ std::thread thd([this](){io_service_.run(); }); -+ thd.detach(); -+ } -+ -+ void run(){ -+ } -+ -+ int connect() { -+ try { -+ socket_.connect(ep_); -+ } catch (std::exception& e) { -+ return -1; -+ } -+ connected = true; -+ return 0; -+ } -+ -+ int register_volume(std::string pool_name, std::string vol_name, uint64_t vol_size) { -+ // cache controller will init layout -+ rbdsc_req_type_t *message = new rbdsc_req_type_t(); -+ message->type = RBDSC_REGISTER; -+ memcpy(message->pool_name, pool_name.c_str(), pool_name.size()); -+ memcpy(message->vol_name, vol_name.c_str(), vol_name.size()); -+ message->vol_size = vol_size; -+ message->offset = 0; -+ message->length = 0; -+ boost::asio::async_write(socket_, boost::asio::buffer((char*)message, message->size()), -+ [this](const boost::system::error_code& err, size_t cb) { -+ if (!err) { -+ boost::asio::async_read(socket_, boost::asio::buffer(buffer_), -+ boost::asio::transfer_exactly(544), -+ [this](const boost::system::error_code& err, size_t cb) { -+ if (!err) { -+ m_client_process_msg(std::string(buffer_, cb)); -+ } else { -+ return -1; -+ } -+ }); -+ } else { -+ return -1; -+ } -+ }); -+ -+ return 0; -+ } -+ -+ int lookup_object(std::string pool_name, std::string vol_name, std::string object_id, bool* result) { -+ rbdsc_req_type_t *message = new rbdsc_req_type_t(); -+ message->type = RBDSC_READ; -+ memcpy(message->pool_name, pool_name.c_str(), pool_name.size()); -+ memcpy(message->vol_name, object_id.c_str(), object_id.size()); -+ message->vol_size = 0; -+ message->offset = 0; -+ message->length = 0; -+ -+ boost::asio::async_write(socket_, boost::asio::buffer((char*)message, message->size()), -+ [this, result](const boost::system::error_code& err, size_t cb) { -+ if (!err) { -+ get_result(result); -+ } else { -+ return -1; -+ } -+ }); -+ std::unique_lock<std::mutex> lk(m); -+ cv.wait(lk); -+ return 0; -+ } -+ -+ void get_result(bool* result) { -+ boost::asio::async_read(socket_, boost::asio::buffer(buffer_), -+ boost::asio::transfer_exactly(544), -+ [this, result](const boost::system::error_code& err, size_t cb) { -+ if (!err) { -+ *result = true; -+ cv.notify_one(); -+ m_client_process_msg(std::string(buffer_, cb)); -+ } else { -+ return -1; -+ } -+ }); -+ } -+ -+ void handle_connect(const boost::system::error_code& error) { -+ //TODO(): open librbd snap -+ } -+ -+ void handle_write(const boost::system::error_code& error) { -+ } -+ -+private: -+ boost::asio::io_service& io_service_; -+ boost::asio::io_service::work io_service_work_; -+ stream_protocol::socket socket_; -+ ClientProcessMsg m_client_process_msg; -+ stream_protocol::endpoint ep_; -+ char buffer_[1024]; -+ int block_size_ = 1024; -+ -+ std::condition_variable cv; -+ std::mutex m; -+ -+public: -+ bool connected = false; -+}; -+ -+#endif -diff --git a/src/tools/rbd_cache/CacheControllerSocketCommon.h b/src/tools/rbd_cache/CacheControllerSocketCommon.h -new file mode 100644 -index 0000000..e253bb1 ---- /dev/null -+++ b/src/tools/rbd_cache/CacheControllerSocketCommon.h -@@ -0,0 +1,43 @@ -+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -+// vim: ts=8 sw=2 smarttab -+ -+#ifndef CACHE_CONTROLLER_SOCKET_COMMON_H -+#define CACHE_CONTROLLER_SOCKET_COMMON_H -+ -+#define RBDSC_REGISTER 0X11 -+#define RBDSC_READ 0X12 -+#define RBDSC_LOOKUP 0X13 -+#define RBDSC_REGISTER_REPLY 0X14 -+#define RBDSC_READ_REPLY 0X15 -+#define RBDSC_LOOKUP_REPLY 0X16 -+#define RBDSC_READ_RADOS 0X16 -+ -+typedef std::function<void(uint64_t, std::string)> ProcessMsg; -+typedef std::function<void(std::string)> ClientProcessMsg; -+typedef uint8_t rbdsc_req_type; -+struct rbdsc_req_type_t { -+ rbdsc_req_type type; -+ uint64_t vol_size; -+ uint64_t offset; -+ uint64_t length; -+ char pool_name[256]; -+ char vol_name[256]; -+ -+ uint64_t size() { -+ return sizeof(rbdsc_req_type_t); -+ } -+ -+ std::string to_buffer() { -+ std::stringstream ss; -+ ss << type; -+ ss << vol_size; -+ ss << offset; -+ ss << length; -+ ss << pool_name; -+ ss << vol_name; -+ -+ return ss.str(); -+ } -+}; -+ -+#endif -diff --git a/src/tools/rbd_cache/ObjectCacheStore.cc b/src/tools/rbd_cache/ObjectCacheStore.cc -new file mode 100644 -index 0000000..90b407c ---- /dev/null -+++ b/src/tools/rbd_cache/ObjectCacheStore.cc -@@ -0,0 +1,147 @@ -+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -+// vim: ts=8 sw=2 smarttab -+ -+#include "ObjectCacheStore.h" -+ -+#define dout_context g_ceph_context -+#define dout_subsys ceph_subsys_rbd_cache -+#undef dout_prefix -+#define dout_prefix *_dout << "rbd::cache::ObjectCacheStore: " << this << " " \ -+ << __func__ << ": " -+ -+ -+ObjectCacheStore::ObjectCacheStore(CephContext *cct, ContextWQ* work_queue) -+ : m_cct(cct), m_work_queue(work_queue), -+ m_cache_table_lock("rbd::cache::ObjectCacheStore"), -+ m_rados(new librados::Rados()) { -+} -+ -+ObjectCacheStore::~ObjectCacheStore() { -+ -+} -+ -+int ObjectCacheStore::init(bool reset) { -+ -+ int ret = m_rados->init_with_context(m_cct); -+ if(ret < 0) { -+ lderr(m_cct) << "fail to init Ceph context" << dendl; -+ return ret; -+ } -+ -+ ret = m_rados->connect(); -+ if(ret < 0 ) { -+ lderr(m_cct) << "fail to conect to cluster" << dendl; -+ return ret; -+ } -+ //TODO(): check existing cache objects -+ return ret; -+} -+ -+int ObjectCacheStore::do_promote(std::string pool_name, std::string object_name) { -+ int ret = 0; -+ std::string cache_file_name = pool_name + object_name; -+ -+ if (m_ioctxs.find(pool_name) == m_ioctxs.end()) { -+ librados::IoCtx* io_ctx = new librados::IoCtx(); -+ ret = m_rados->ioctx_create(pool_name.c_str(), *io_ctx); -+ if (ret < 0) { -+ lderr(m_cct) << "fail to create ioctx" << dendl; -+ assert(0); -+ } -+ m_ioctxs.emplace(pool_name, io_ctx); -+ } -+ -+ assert(m_ioctxs.find(pool_name) != m_ioctxs.end()); -+ -+ librados::IoCtx* ioctx = m_ioctxs[pool_name]; -+ -+ //promoting: update metadata -+ { -+ Mutex::Locker locker(m_cache_table_lock); -+ m_cache_table.emplace(cache_file_name, PROMOTING); -+ } -+ -+ librados::bufferlist read_buf; -+ int object_size = 4096*1024; //TODO(): read config from image metadata -+ -+ //TODO(): async promote -+ ret = promote_object(ioctx, object_name, read_buf, object_size); -+ if (ret == -ENOENT) { -+ read_buf.append(std::string(object_size, '0')); -+ ret = 0; -+ } -+ -+ if( ret < 0) { -+ lderr(m_cct) << "fail to read from rados" << dendl; -+ return ret; -+ } -+ -+ // persistent to cache -+ os::CacheStore::SyncFile cache_file(m_cct, cache_file_name); -+ cache_file.open(); -+ ret = cache_file.write_object_to_file(read_buf, object_size); -+ -+ assert(m_cache_table.find(cache_file_name) != m_cache_table.end()); -+ -+ // update metadata -+ { -+ Mutex::Locker locker(m_cache_table_lock); -+ m_cache_table.emplace(cache_file_name, PROMOTED); -+ } -+ -+ return ret; -+ -+} -+ -+int ObjectCacheStore::lookup_object(std::string pool_name, std::string object_name) { -+ -+ std::string cache_file_name = pool_name + object_name; -+ { -+ Mutex::Locker locker(m_cache_table_lock); -+ -+ auto it = m_cache_table.find(cache_file_name); -+ if (it != m_cache_table.end()) { -+ -+ if (it->second == PROMOTING) { -+ return -1; -+ } else if (it->second == PROMOTED) { -+ return 0; -+ } else { -+ assert(0); -+ } -+ } -+ } -+ -+ int ret = do_promote(pool_name, object_name); -+ -+ return ret; -+} -+ -+int ObjectCacheStore::shutdown() { -+ m_rados->shutdown(); -+ return 0; -+} -+ -+int ObjectCacheStore::init_cache(std::string vol_name, uint64_t vol_size) { -+ return 0; -+} -+ -+int ObjectCacheStore::lock_cache(std::string vol_name) { -+ return 0; -+} -+ -+int ObjectCacheStore::promote_object(librados::IoCtx* ioctx, std::string object_name, librados::bufferlist read_buf, uint64_t read_len) { -+ int ret; -+ -+ librados::AioCompletion* read_completion = librados::Rados::aio_create_completion(); -+ -+ ret = ioctx->aio_read(object_name, read_completion, &read_buf, read_len, 0); -+ if(ret < 0) { -+ lderr(m_cct) << "fail to read from rados" << dendl; -+ return ret; -+ } -+ read_completion->wait_for_complete(); -+ ret = read_completion->get_return_value(); -+ return ret; -+ -+} -diff --git a/src/tools/rbd_cache/ObjectCacheStore.h b/src/tools/rbd_cache/ObjectCacheStore.h -new file mode 100644 -index 0000000..12f8399 ---- /dev/null -+++ b/src/tools/rbd_cache/ObjectCacheStore.h -@@ -0,0 +1,65 @@ -+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -+// vim: ts=8 sw=2 smarttab -+ -+#ifndef OBJECT_CACHE_STORE_H -+#define OBJECT_CACHE_STORE_H -+ -+#include "common/debug.h" -+#include "common/errno.h" -+#include "common/ceph_context.h" -+#include "common/Mutex.h" -+#include "include/rados/librados.hpp" -+#include "include/rbd/librbd.h" -+#include "librbd/ImageCtx.h" -+#include "librbd/ImageState.h" -+#include "os/CacheStore/SyncFile.h" -+ -+using librados::Rados; -+using librados::IoCtx; -+ -+typedef shared_ptr<librados::Rados> RadosRef; -+typedef shared_ptr<librados::IoCtx> IoCtxRef; -+ -+class ObjectCacheStore -+{ -+ public: -+ ObjectCacheStore(CephContext *cct, ContextWQ* work_queue); -+ ~ObjectCacheStore(); -+ -+ int init(bool reset); -+ -+ int shutdown(); -+ -+ int lookup_object(std::string pool_name, std::string object_name); -+ -+ int init_cache(std::string vol_name, uint64_t vol_size); -+ -+ int lock_cache(std::string vol_name); -+ -+ private: -+ int _evict_object(); -+ -+ int do_promote(std::string pool_name, std::string object_name); -+ -+ int promote_object(librados::IoCtx*, std::string object_name, -+ librados::bufferlist read_buf, -+ uint64_t length); -+ -+ enum { -+ PROMOTING = 0, -+ PROMOTED, -+ }; -+ -+ CephContext *m_cct; -+ ContextWQ* m_work_queue; -+ Mutex m_cache_table_lock; -+ RadosRef m_rados; -+ -+ std::map<std::string, uint8_t> m_cache_table; -+ -+ std::map<std::string, librados::IoCtx*> m_ioctxs; -+ -+ os::CacheStore::SyncFile *m_cache_file; -+}; -+ -+#endif -diff --git a/src/tools/rbd_cache/main.cc b/src/tools/rbd_cache/main.cc -new file mode 100644 -index 0000000..336a581 ---- /dev/null -+++ b/src/tools/rbd_cache/main.cc -@@ -0,0 +1,85 @@ -+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -+// vim: ts=8 sw=2 smarttab -+ -+#include "common/ceph_argparse.h" -+#include "common/config.h" -+#include "common/debug.h" -+#include "common/errno.h" -+#include "global/global_init.h" -+#include "global/signal_handler.h" -+#include "CacheController.hpp" -+ -+#include <vector> -+ -+CacheController *cachectl = nullptr; -+ -+void usage() { -+ std::cout << "usage: cache controller [options...]" << std::endl; -+ std::cout << "options:\n"; -+ std::cout << " -m monaddress[:port] connect to specified monitor\n"; -+ std::cout << " --keyring=<path> path to keyring for local cluster\n"; -+ std::cout << " --log-file=<logfile> file to log debug output\n"; -+ std::cout << " --debug-rbd-cachecontroller=<log-level>/<memory-level> set rbd-mirror debug level\n"; -+ generic_server_usage(); -+} -+ -+static void handle_signal(int signum) -+{ -+ if (cachectl) -+ cachectl->handle_signal(signum); -+} -+ -+int main(int argc, const char **argv) -+{ -+ std::vector<const char*> args; -+ env_to_vec(args); -+ argv_to_vec(argc, argv, args); -+ -+ auto cct = global_init(nullptr, args, CEPH_ENTITY_TYPE_CLIENT, -+ CODE_ENVIRONMENT_DAEMON, -+ CINIT_FLAG_UNPRIVILEGED_DAEMON_DEFAULTS); -+ -+ for (auto i = args.begin(); i != args.end(); ++i) { -+ if (ceph_argparse_flag(args, i, "-h", "--help", (char*)NULL)) { -+ usage(); -+ return EXIT_SUCCESS; -+ } -+ } -+ -+ if (g_conf->daemonize) { -+ global_init_daemonize(g_ceph_context); -+ } -+ g_ceph_context->enable_perf_counter(); -+ -+ common_init_finish(g_ceph_context); -+ -+ init_async_signal_handler(); -+ register_async_signal_handler(SIGHUP, sighup_handler); -+ register_async_signal_handler_oneshot(SIGINT, handle_signal); -+ register_async_signal_handler_oneshot(SIGTERM, handle_signal); -+ -+ std::vector<const char*> cmd_args; -+ argv_to_vec(argc, argv, cmd_args); -+ -+ // disable unnecessary librbd cache -+ g_ceph_context->_conf->set_val_or_die("rbd_cache", "false"); -+ -+ cachectl = new CacheController(g_ceph_context, cmd_args); -+ int r = cachectl->init(); -+ if (r < 0) { -+ std::cerr << "failed to initialize: " << cpp_strerror(r) << std::endl; -+ goto cleanup; -+ } -+ -+ cachectl->run(); -+ -+ cleanup: -+ unregister_async_signal_handler(SIGHUP, sighup_handler); -+ unregister_async_signal_handler(SIGINT, handle_signal); -+ unregister_async_signal_handler(SIGTERM, handle_signal); -+ shutdown_async_signal_handler(); -+ -+ delete cachectl; -+ -+ return r < 0 ? EXIT_SUCCESS : EXIT_FAILURE; -+} --- -2.7.4 - |