diff options
Diffstat (limited to 'src/ceph/0001-librbd-shared-persistent-read-only-rbd-cache.patch')
-rw-r--r-- | src/ceph/0001-librbd-shared-persistent-read-only-rbd-cache.patch | 1685 |
1 files changed, 1685 insertions, 0 deletions
diff --git a/src/ceph/0001-librbd-shared-persistent-read-only-rbd-cache.patch b/src/ceph/0001-librbd-shared-persistent-read-only-rbd-cache.patch new file mode 100644 index 0000000..0476086 --- /dev/null +++ b/src/ceph/0001-librbd-shared-persistent-read-only-rbd-cache.patch @@ -0,0 +1,1685 @@ +From b7b81562c76011abe05930330915a5ba423964e4 Mon Sep 17 00:00:00 2001 +From: Yuan Zhou <yuan.zhou@intel.com> +Date: Thu, 19 Apr 2018 22:54:36 +0800 +Subject: [PATCH 01/10] librbd: shared persistent read-only rbd cache + +This patch introduces introduces RBD shared persistent RO cache which +can provide client-side sharing cache for rbd clone/snapshot case. + +The key componenets are: + +- RBD cache daemon runs on each compute node to control the shared cache state + +- Read-only blocks from parent image(s) are cached in a shared area on + compute node(s) + +- Object level dispatcher inside librbd that can do RPC with cache daemon to + lookup the cache + +- Reads are served from the shared cache until the first COW request + +- Policy to control promotion/evication of the shared cache + +The general IO flow is: + +0) Parent image would register themselfs when initializing + +1) When read request on cloned image flows to parent image, it will check with + the cache daemon if the rarget object is ready + +2) Cache daemon receives the lookup request: + a) if the target object is promoted, daemon will ack with "read_from_cache" + b) if it is not promoted, daemon will check the policy whether to promote: + - if yes, daemon will do the promiton then ack with "read_from_cache" + - if no, daemon will ack with "read_from_rados" + +3) the read reqeust contines to do read from cache/rados based on the ack + +Signed-off-by: Yuan Zhou <yuan.zhou@intel.com> +--- + src/common/options.cc | 8 ++ + src/librbd/CMakeLists.txt | 4 +- + src/librbd/ImageCtx.cc | 5 +- + src/librbd/ImageCtx.h | 3 + + src/librbd/cache/SharedPersistentObjectCacher.cc | 61 ++++++++ + src/librbd/cache/SharedPersistentObjectCacher.h | 45 ++++++ + .../SharedPersistentObjectCacherObjectDispatch.cc | 154 +++++++++++++++++++++ + .../SharedPersistentObjectCacherObjectDispatch.h | 127 +++++++++++++++++ + src/librbd/image/OpenRequest.cc | 12 +- + src/librbd/io/Types.h | 1 + + src/os/CacheStore/SyncFile.cc | 110 +++++++++++++++ + src/os/CacheStore/SyncFile.h | 74 ++++++++++ + src/test/librbd/test_mirroring.cc | 1 + + src/test/rbd_mirror/test_ImageReplayer.cc | 2 + + src/test/rbd_mirror/test_fixture.cc | 1 + + src/tools/CMakeLists.txt | 1 + + src/tools/rbd_cache/CMakeLists.txt | 9 ++ + src/tools/rbd_cache/CacheController.cc | 105 ++++++++++++++ + src/tools/rbd_cache/CacheController.hpp | 49 +++++++ + src/tools/rbd_cache/CacheControllerSocket.hpp | 125 +++++++++++++++++ + .../rbd_cache/CacheControllerSocketClient.hpp | 131 ++++++++++++++++++ + src/tools/rbd_cache/CacheControllerSocketCommon.h | 43 ++++++ + src/tools/rbd_cache/ObjectCacheStore.cc | 147 ++++++++++++++++++++ + src/tools/rbd_cache/ObjectCacheStore.h | 65 +++++++++ + src/tools/rbd_cache/main.cc | 85 ++++++++++++ + 25 files changed, 1365 insertions(+), 3 deletions(-) + create mode 100644 src/librbd/cache/SharedPersistentObjectCacher.cc + create mode 100644 src/librbd/cache/SharedPersistentObjectCacher.h + create mode 100644 src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc + create mode 100644 src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.h + create mode 100644 src/os/CacheStore/SyncFile.cc + create mode 100644 src/os/CacheStore/SyncFile.h + create mode 100644 src/tools/rbd_cache/CMakeLists.txt + create mode 100644 src/tools/rbd_cache/CacheController.cc + create mode 100644 src/tools/rbd_cache/CacheController.hpp + create mode 100644 src/tools/rbd_cache/CacheControllerSocket.hpp + create mode 100644 src/tools/rbd_cache/CacheControllerSocketClient.hpp + create mode 100644 src/tools/rbd_cache/CacheControllerSocketCommon.h + create mode 100644 src/tools/rbd_cache/ObjectCacheStore.cc + create mode 100644 src/tools/rbd_cache/ObjectCacheStore.h + create mode 100644 src/tools/rbd_cache/main.cc + +diff --git a/src/common/options.cc b/src/common/options.cc +index c5afe4c..7839a31 100644 +--- a/src/common/options.cc ++++ b/src/common/options.cc +@@ -6357,6 +6357,14 @@ static std::vector<Option> get_rbd_options() { + .set_default(60) + .set_description("time in seconds for detecting a hung thread"), + ++ Option("rbd_shared_cache_enabled", Option::TYPE_BOOL, Option::LEVEL_ADVANCED) ++ .set_default(true) ++ .set_description("whether to enable shared ssd caching"), ++ ++ Option("rbd_shared_cache_path", Option::TYPE_STR, Option::LEVEL_ADVANCED) ++ .set_default("/tmp") ++ .set_description("shared ssd caching data dir"), ++ + Option("rbd_non_blocking_aio", Option::TYPE_BOOL, Option::LEVEL_ADVANCED) + .set_default(true) + .set_description("process AIO ops from a dispatch thread to prevent blocking"), +diff --git a/src/librbd/CMakeLists.txt b/src/librbd/CMakeLists.txt +index b9c08d4..92539a8 100644 +--- a/src/librbd/CMakeLists.txt ++++ b/src/librbd/CMakeLists.txt +@@ -32,7 +32,8 @@ set(librbd_internal_srcs + api/Snapshot.cc + cache/ImageWriteback.cc + cache/ObjectCacherObjectDispatch.cc +- cache/PassthroughImageCache.cc ++ cache/SharedPersistentObjectCacherObjectDispatch.cc ++ cache/SharedPersistentObjectCacher.cc + deep_copy/ImageCopyRequest.cc + deep_copy/MetadataCopyRequest.cc + deep_copy/ObjectCopyRequest.cc +@@ -123,6 +124,7 @@ set(librbd_internal_srcs + trash/MoveRequest.cc + watcher/Notifier.cc + watcher/RewatchRequest.cc ++ ${CMAKE_SOURCE_DIR}/src/os/CacheStore/SyncFile.cc + ${CMAKE_SOURCE_DIR}/src/common/ContextCompletion.cc) + + add_library(rbd_api STATIC librbd.cc) +diff --git a/src/librbd/ImageCtx.cc b/src/librbd/ImageCtx.cc +index 48f98b1..349156b 100644 +--- a/src/librbd/ImageCtx.cc ++++ b/src/librbd/ImageCtx.cc +@@ -776,7 +776,8 @@ public: + "rbd_qos_read_iops_limit", false)( + "rbd_qos_write_iops_limit", false)( + "rbd_qos_read_bps_limit", false)( +- "rbd_qos_write_bps_limit", false); ++ "rbd_qos_write_bps_limit", false)( ++ "rbd_shared_cache_enabled", false); + + ConfigProxy local_config_t{false}; + std::map<std::string, bufferlist> res; +@@ -844,6 +845,8 @@ public: + ASSIGN_OPTION(qos_write_iops_limit, uint64_t); + ASSIGN_OPTION(qos_read_bps_limit, uint64_t); + ASSIGN_OPTION(qos_write_bps_limit, uint64_t); ++ ASSIGN_OPTION(shared_cache_enabled, bool); ++ ASSIGN_OPTION(shared_cache_path, std::string); + + if (thread_safe) { + ASSIGN_OPTION(journal_pool, std::string); +diff --git a/src/librbd/ImageCtx.h b/src/librbd/ImageCtx.h +index d197c24..f661c09 100644 +--- a/src/librbd/ImageCtx.h ++++ b/src/librbd/ImageCtx.h +@@ -204,6 +204,9 @@ namespace librbd { + uint64_t qos_read_bps_limit; + uint64_t qos_write_bps_limit; + ++ bool shared_cache_enabled; ++ std::string shared_cache_path; ++ + LibrbdAdminSocketHook *asok_hook; + + exclusive_lock::Policy *exclusive_lock_policy = nullptr; +diff --git a/src/librbd/cache/SharedPersistentObjectCacher.cc b/src/librbd/cache/SharedPersistentObjectCacher.cc +new file mode 100644 +index 0000000..a849260 +--- /dev/null ++++ b/src/librbd/cache/SharedPersistentObjectCacher.cc +@@ -0,0 +1,61 @@ ++// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- ++// vim: ts=8 sw=2 smarttab ++ ++#include "librbd/cache/SharedPersistentObjectCacher.h" ++#include "include/buffer.h" ++#include "common/dout.h" ++#include "librbd/ImageCtx.h" ++ ++#define dout_subsys ceph_subsys_rbd ++#undef dout_prefix ++#define dout_prefix *_dout << "librbd::cache::SharedPersistentObjectCacher: " << this \ ++ << " " << __func__ << ": " ++ ++namespace librbd { ++namespace cache { ++ ++template <typename I> ++SharedPersistentObjectCacher<I>::SharedPersistentObjectCacher(I *image_ctx, std::string cache_path) ++ : m_image_ctx(image_ctx), m_cache_path(cache_path), ++ m_file_map_lock("librbd::cache::SharedObjectCacher::filemaplock") { ++ auto *cct = m_image_ctx->cct; ++ ++} ++ ++template <typename I> ++SharedPersistentObjectCacher<I>::~SharedPersistentObjectCacher() { ++ for(auto &it: file_map) { ++ if(it.second) { ++ delete it.second; ++ } ++ } ++} ++ ++template <typename I> ++int SharedPersistentObjectCacher<I>::read_object(std::string oid, ceph::bufferlist* read_data, uint64_t offset, uint64_t length, Context *on_finish) { ++ ++ auto *cct = m_image_ctx->cct; ++ ldout(cct, 20) << "object: " << oid << dendl; ++ ++ std::string cache_file_name = m_image_ctx->data_ctx.get_pool_name() + oid; ++ ++ //TODO(): make a cache for cachefile fd ++ os::CacheStore::SyncFile* target_cache_file = new os::CacheStore::SyncFile(cct, cache_file_name); ++ target_cache_file->open(); ++ ++ int ret = target_cache_file->read_object_from_file(read_data, offset, length); ++ if (ret < 0) { ++ ldout(cct, 5) << "read from file return error: " << ret ++ << "file name= " << cache_file_name ++ << dendl; ++ } ++ ++ delete target_cache_file; ++ return ret; ++} ++ ++ ++} // namespace cache ++} // namespace librbd ++ ++template class librbd::cache::SharedPersistentObjectCacher<librbd::ImageCtx>; +diff --git a/src/librbd/cache/SharedPersistentObjectCacher.h b/src/librbd/cache/SharedPersistentObjectCacher.h +new file mode 100644 +index 0000000..d108a05 +--- /dev/null ++++ b/src/librbd/cache/SharedPersistentObjectCacher.h +@@ -0,0 +1,45 @@ ++// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- ++// vim: ts=8 sw=2 smarttab ++ ++#ifndef CEPH_LIBRBD_CACHE_SHARED_PERSISTENT_OBJECT_CACHER ++#define CEPH_LIBRBD_CACHE_SHARED_PERSISTENT_OBJECT_CACHER ++ ++#include "include/buffer_fwd.h" ++#include "include/int_types.h" ++#include "os/CacheStore/SyncFile.h" ++#include "common/Mutex.h" ++#include <vector> ++#include <map> ++ ++struct Context; ++ ++namespace librbd { ++ ++struct ImageCtx; ++ ++namespace cache { ++ ++template <typename ImageCtxT> ++class SharedPersistentObjectCacher { ++public: ++ ++ SharedPersistentObjectCacher(ImageCtxT *image_ctx, std::string cache_path); ++ ~SharedPersistentObjectCacher(); ++ ++ int read_object(std::string oid, ceph::bufferlist* read_data, ++ uint64_t offset, uint64_t length, Context *on_finish); ++ ++private: ++ ImageCtxT *m_image_ctx; ++ std::map<std::string, os::CacheStore::SyncFile*> file_map; ++ Mutex m_file_map_lock; ++ std::string m_cache_path; ++ ++}; ++ ++} // namespace cache ++} // namespace librbd ++ ++extern template class librbd::cache::SharedPersistentObjectCacher<librbd::ImageCtx>; ++ ++#endif // CEPH_LIBRBD_CACHE_FILE_IMAGE_STORE +diff --git a/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc b/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc +new file mode 100644 +index 0000000..90d886c +--- /dev/null ++++ b/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc +@@ -0,0 +1,154 @@ ++// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- ++// vim: ts=8 sw=2 smarttab ++ ++#include "librbd/cache/SharedPersistentObjectCacherObjectDispatch.h" ++#include "common/WorkQueue.h" ++#include "librbd/ImageCtx.h" ++#include "librbd/Journal.h" ++#include "librbd/Utils.h" ++#include "librbd/LibrbdWriteback.h" ++#include "librbd/io/ObjectDispatchSpec.h" ++#include "librbd/io/ObjectDispatcher.h" ++#include "librbd/io/Utils.h" ++#include "osd/osd_types.h" ++#include "osdc/WritebackHandler.h" ++#include <vector> ++ ++#define dout_subsys ceph_subsys_rbd ++#undef dout_prefix ++#define dout_prefix *_dout << "librbd::cache::SharedPersistentObjectCacherObjectDispatch: " \ ++ << this << " " << __func__ << ": " ++ ++namespace librbd { ++namespace cache { ++ ++template <typename I> ++SharedPersistentObjectCacherObjectDispatch<I>::SharedPersistentObjectCacherObjectDispatch( ++ I* image_ctx) : m_image_ctx(image_ctx) { ++} ++ ++template <typename I> ++SharedPersistentObjectCacherObjectDispatch<I>::~SharedPersistentObjectCacherObjectDispatch() { ++ if (m_object_store) { ++ delete m_object_store; ++ } ++ ++ if (m_cache_client) { ++ delete m_cache_client; ++ } ++} ++ ++template <typename I> ++void SharedPersistentObjectCacherObjectDispatch<I>::init() { ++ auto cct = m_image_ctx->cct; ++ ldout(cct, 5) << dendl; ++ ++ if (m_image_ctx->parent != nullptr) { ++ //TODO(): should we cover multi-leveled clone? ++ ldout(cct, 5) << "child image: skipping SRO cache client" << dendl; ++ return; ++ } ++ ++ ldout(cct, 20) << "parent image: setup SRO cache client = " << dendl; ++ ++ std::string controller_path = "/tmp/rbd_shared_readonly_cache_demo"; ++ m_cache_client = new CacheClient(io_service, controller_path.c_str(), ++ ([&](std::string s){client_handle_request(s);})); ++ ++ int ret = m_cache_client->connect(); ++ if (ret < 0) { ++ ldout(cct, 5) << "SRO cache client fail to connect with local controller: " ++ << "please start rbd-cache daemon" ++ << dendl; ++ } else { ++ ldout(cct, 5) << "SRO cache client to register volume on rbd-cache daemon: " ++ << "name = " << m_image_ctx->id ++ << dendl; ++ ++ ret = m_cache_client->register_volume(m_image_ctx->data_ctx.get_pool_name(), ++ m_image_ctx->id, m_image_ctx->size); ++ ++ if (ret >= 0) { ++ // add ourself to the IO object dispatcher chain ++ m_image_ctx->io_object_dispatcher->register_object_dispatch(this); ++ } ++ } ++} ++ ++template <typename I> ++bool SharedPersistentObjectCacherObjectDispatch<I>::read( ++ const std::string &oid, uint64_t object_no, uint64_t object_off, ++ uint64_t object_len, librados::snap_t snap_id, int op_flags, ++ const ZTracer::Trace &parent_trace, ceph::bufferlist* read_data, ++ io::ExtentMap* extent_map, int* object_dispatch_flags, ++ io::DispatchResult* dispatch_result, Context** on_finish, ++ Context* on_dispatched) { ++ // IO chained in reverse order ++ auto cct = m_image_ctx->cct; ++ ldout(cct, 20) << "object_no=" << object_no << " " << object_off << "~" ++ << object_len << dendl; ++ ++ // ensure we aren't holding the cache lock post-read ++ on_dispatched = util::create_async_context_callback(*m_image_ctx, ++ on_dispatched); ++ ++ if (m_cache_client && m_cache_client->connected && m_object_store) { ++ bool exists; ++ m_cache_client->lookup_object(m_image_ctx->data_ctx.get_pool_name(), ++ m_image_ctx->id, oid, &exists); ++ ++ // try to read from parent image ++ ldout(cct, 20) << "SRO cache object exists:" << exists << dendl; ++ if (exists) { ++ int r = m_object_store->read_object(oid, read_data, object_off, object_len, on_dispatched); ++ if (r != 0) { ++ *dispatch_result = io::DISPATCH_RESULT_COMPLETE; ++ on_dispatched->complete(r); ++ return true; ++ } ++ } ++ } ++ ++ ldout(cct, 20) << "Continue read from RADOS" << dendl; ++ *dispatch_result = io::DISPATCH_RESULT_CONTINUE; ++ on_dispatched->complete(0); ++ return true; ++} ++ ++template <typename I> ++void SharedPersistentObjectCacherObjectDispatch<I>::client_handle_request(std::string msg) { ++ auto cct = m_image_ctx->cct; ++ ldout(cct, 20) << dendl; ++ ++ rbdsc_req_type_t *io_ctx = (rbdsc_req_type_t*)(msg.c_str()); ++ ++ switch (io_ctx->type) { ++ case RBDSC_REGISTER_REPLY: { ++ // open cache handler for volume ++ ldout(cct, 20) << "SRO cache client open cache handler" << dendl; ++ m_object_store = new SharedPersistentObjectCacher<I>(m_image_ctx, m_image_ctx->shared_cache_path); ++ ++ break; ++ } ++ case RBDSC_READ_REPLY: { ++ ldout(cct, 20) << "SRO cache client start to read cache" << dendl; ++ //TODO(): should call read here ++ ++ break; ++ } ++ case RBDSC_READ_RADOS: { ++ ldout(cct, 20) << "SRO cache client start to read rados" << dendl; ++ //TODO(): should call read here ++ ++ break; ++ } ++ default: ldout(cct, 20) << "nothing" << dendl; ++ break; ++ ++ } ++} ++ ++} // namespace cache ++} // namespace librbd ++ ++template class librbd::cache::SharedPersistentObjectCacherObjectDispatch<librbd::ImageCtx>; +diff --git a/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.h b/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.h +new file mode 100644 +index 0000000..1ede804 +--- /dev/null ++++ b/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.h +@@ -0,0 +1,127 @@ ++// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- ++// vim: ts=8 sw=2 smarttab ++ ++#ifndef CEPH_LIBRBD_CACHE_SHARED_PERSISTENT_OBJECT_CACHER_OBJECT_DISPATCH_H ++#define CEPH_LIBRBD_CACHE_SHARED_PERSISTENT_OBJECT_CACHER_OBJECT_DISPATCH_H ++ ++#include "librbd/io/ObjectDispatchInterface.h" ++#include "common/Mutex.h" ++#include "osdc/ObjectCacher.h" ++#include "tools/rbd_cache/CacheControllerSocketClient.hpp" ++#include "SharedPersistentObjectCacher.h" ++ ++struct WritebackHandler; ++ ++namespace librbd { ++ ++class ImageCtx; ++ ++namespace cache { ++ ++/** ++ * Facade around the OSDC object cacher to make it align with ++ * the object dispatcher interface ++ */ ++template <typename ImageCtxT = ImageCtx> ++class SharedPersistentObjectCacherObjectDispatch : public io::ObjectDispatchInterface { ++public: ++ static SharedPersistentObjectCacherObjectDispatch* create(ImageCtxT* image_ctx) { ++ return new SharedPersistentObjectCacherObjectDispatch(image_ctx); ++ } ++ ++ SharedPersistentObjectCacherObjectDispatch(ImageCtxT* image_ctx); ++ ~SharedPersistentObjectCacherObjectDispatch() override; ++ ++ io::ObjectDispatchLayer get_object_dispatch_layer() const override { ++ return io::OBJECT_DISPATCH_LAYER_SHARED_PERSISTENT_CACHE; ++ } ++ ++ void init(); ++ void shut_down(Context* on_finish) { ++ m_image_ctx->op_work_queue->queue(on_finish, 0); ++ } ++ ++ bool read( ++ const std::string &oid, uint64_t object_no, uint64_t object_off, ++ uint64_t object_len, librados::snap_t snap_id, int op_flags, ++ const ZTracer::Trace &parent_trace, ceph::bufferlist* read_data, ++ io::ExtentMap* extent_map, int* object_dispatch_flags, ++ io::DispatchResult* dispatch_result, Context** on_finish, ++ Context* on_dispatched) override; ++ ++ bool discard( ++ const std::string &oid, uint64_t object_no, uint64_t object_off, ++ uint64_t object_len, const ::SnapContext &snapc, int discard_flags, ++ const ZTracer::Trace &parent_trace, int* object_dispatch_flags, ++ uint64_t* journal_tid, io::DispatchResult* dispatch_result, ++ Context** on_finish, Context* on_dispatched) { ++ return false; ++ } ++ ++ bool write( ++ const std::string &oid, uint64_t object_no, uint64_t object_off, ++ ceph::bufferlist&& data, const ::SnapContext &snapc, int op_flags, ++ const ZTracer::Trace &parent_trace, int* object_dispatch_flags, ++ uint64_t* journal_tid, io::DispatchResult* dispatch_result, ++ Context** on_finish, Context* on_dispatched) { ++ return false; ++ } ++ ++ bool write_same( ++ const std::string &oid, uint64_t object_no, uint64_t object_off, ++ uint64_t object_len, io::Extents&& buffer_extents, ++ ceph::bufferlist&& data, const ::SnapContext &snapc, int op_flags, ++ const ZTracer::Trace &parent_trace, int* object_dispatch_flags, ++ uint64_t* journal_tid, io::DispatchResult* dispatch_result, ++ Context** on_finish, Context* on_dispatched) { ++ return false; ++ } ++ ++ bool compare_and_write( ++ const std::string &oid, uint64_t object_no, uint64_t object_off, ++ ceph::bufferlist&& cmp_data, ceph::bufferlist&& write_data, ++ const ::SnapContext &snapc, int op_flags, ++ const ZTracer::Trace &parent_trace, uint64_t* mismatch_offset, ++ int* object_dispatch_flags, uint64_t* journal_tid, ++ io::DispatchResult* dispatch_result, Context** on_finish, ++ Context* on_dispatched) { ++ return false; ++ } ++ ++ bool flush( ++ io::FlushSource flush_source, const ZTracer::Trace &parent_trace, ++ io::DispatchResult* dispatch_result, Context** on_finish, ++ Context* on_dispatched) { ++ return false; ++ } ++ ++ bool invalidate_cache(Context* on_finish) { ++ return false; ++ } ++ ++ bool reset_existence_cache(Context* on_finish) { ++ return false; ++ } ++ ++ void extent_overwritten( ++ uint64_t object_no, uint64_t object_off, uint64_t object_len, ++ uint64_t journal_tid, uint64_t new_journal_tid) { ++ } ++ ++ SharedPersistentObjectCacher<ImageCtxT> *m_object_store = nullptr; ++ ++private: ++ ++ ImageCtxT* m_image_ctx; ++ ++ void client_handle_request(std::string msg); ++ CacheClient *m_cache_client = nullptr; ++ boost::asio::io_service io_service; ++}; ++ ++} // namespace cache ++} // namespace librbd ++ ++extern template class librbd::cache::SharedPersistentObjectCacherObjectDispatch<librbd::ImageCtx>; ++ ++#endif // CEPH_LIBRBD_CACHE_OBJECT_CACHER_OBJECT_DISPATCH_H +diff --git a/src/librbd/image/OpenRequest.cc b/src/librbd/image/OpenRequest.cc +index ae18739..30a7b66 100644 +--- a/src/librbd/image/OpenRequest.cc ++++ b/src/librbd/image/OpenRequest.cc +@@ -8,6 +8,7 @@ + #include "librbd/ImageCtx.h" + #include "librbd/Utils.h" + #include "librbd/cache/ObjectCacherObjectDispatch.h" ++#include "librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc" + #include "librbd/image/CloseRequest.h" + #include "librbd/image/RefreshRequest.h" + #include "librbd/image/SetSnapRequest.h" +@@ -448,12 +449,21 @@ Context *OpenRequest<I>::handle_refresh(int *result) { + + template <typename I> + Context *OpenRequest<I>::send_init_cache(int *result) { ++ ++ CephContext *cct = m_image_ctx->cct; + // cache is disabled or parent image context + if (!m_image_ctx->cache || m_image_ctx->child != nullptr) { ++ ++ // enable Shared Read-only cache for parent image ++ if (m_image_ctx->child != nullptr && m_image_ctx->shared_cache_enabled ) { ++ ldout(cct, 10) << this << " " << "setting up parent cache"<< dendl; ++ auto sro_cache = cache::SharedPersistentObjectCacherObjectDispatch<I>::create(m_image_ctx); ++ sro_cache->init(); ++ } ++ + return send_register_watch(result); + } + +- CephContext *cct = m_image_ctx->cct; + ldout(cct, 10) << this << " " << __func__ << dendl; + + auto cache = cache::ObjectCacherObjectDispatch<I>::create(m_image_ctx); +diff --git a/src/librbd/io/Types.h b/src/librbd/io/Types.h +index 7e09c90..ef3049f 100644 +--- a/src/librbd/io/Types.h ++++ b/src/librbd/io/Types.h +@@ -59,6 +59,7 @@ enum DispatchResult { + enum ObjectDispatchLayer { + OBJECT_DISPATCH_LAYER_NONE = 0, + OBJECT_DISPATCH_LAYER_CACHE, ++ OBJECT_DISPATCH_LAYER_SHARED_PERSISTENT_CACHE, + OBJECT_DISPATCH_LAYER_JOURNAL, + OBJECT_DISPATCH_LAYER_CORE, + OBJECT_DISPATCH_LAYER_LAST +diff --git a/src/os/CacheStore/SyncFile.cc b/src/os/CacheStore/SyncFile.cc +new file mode 100644 +index 0000000..5352bde +--- /dev/null ++++ b/src/os/CacheStore/SyncFile.cc +@@ -0,0 +1,110 @@ ++// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- ++// vim: ts=8 sw=2 smarttab ++ ++#include "os/CacheStore/SyncFile.h" ++#include "include/Context.h" ++#include "common/dout.h" ++#include "common/WorkQueue.h" ++#include "librbd/ImageCtx.h" ++#include <sys/types.h> ++#include <sys/stat.h> ++#include <aio.h> ++#include <errno.h> ++#include <fcntl.h> ++#include <utility> ++ ++#define dout_subsys ceph_subsys_rbd ++#undef dout_prefix ++#define dout_prefix *_dout << "librbd::file::SyncFile: " << this << " " \ ++ << __func__ << ": " ++ ++namespace os { ++namespace CacheStore { ++ ++SyncFile::SyncFile(CephContext *cct, const std::string &name) ++ : cct(cct) ++{ ++ m_name = cct->_conf->get_val<std::string>("rbd_shared_cache_path") + "/rbd_cache." + name; ++ ldout(cct, 20) << "file path=" << m_name << dendl; ++} ++ ++SyncFile::~SyncFile() ++{ ++ // TODO force proper cleanup ++ if (m_fd != -1) { ++ ::close(m_fd); ++ } ++} ++ ++void SyncFile::open(Context *on_finish) ++{ ++ while (true) { ++ m_fd = ::open(m_name.c_str(), O_CREAT | O_DIRECT | O_NOATIME | O_RDWR | O_SYNC, ++ S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP); ++ if (m_fd == -1) { ++ int r = -errno; ++ if (r == -EINTR) { ++ continue; ++ } ++ on_finish->complete(r); ++ return; ++ } ++ break; ++ } ++ ++ on_finish->complete(0); ++} ++ ++void SyncFile::open() ++{ ++ while (true) ++ { ++ m_fd = ::open(m_name.c_str(), O_CREAT | O_NOATIME | O_RDWR | O_SYNC, ++ S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP); ++ if (m_fd == -1) ++ { ++ int r = -errno; ++ if (r == -EINTR) { ++ continue; ++ } ++ return; ++ } ++ break; ++ } ++} ++ ++int SyncFile::write_object_to_file(ceph::bufferlist read_buf, uint64_t object_len) { ++ ++ ldout(cct, 20) << "cache file name:" << m_name ++ << ", length:" << object_len << dendl; ++ ++ // TODO(): aio ++ int ret = pwrite(m_fd, read_buf.c_str(), object_len, 0); ++ if(ret < 0) { ++ lderr(cct)<<"write file fail:" << std::strerror(errno) << dendl; ++ return ret; ++ } ++ ++ return ret; ++} ++ ++int SyncFile::read_object_from_file(ceph::bufferlist* read_buf, uint64_t object_off, uint64_t object_len) { ++ ++ ldout(cct, 20) << "offset:" << object_off ++ << ", length:" << object_len << dendl; ++ ++ bufferptr buf(object_len); ++ ++ // TODO(): aio ++ int ret = pread(m_fd, buf.c_str(), object_len, object_off); ++ if(ret < 0) { ++ lderr(cct)<<"read file fail:" << std::strerror(errno) << dendl; ++ return ret; ++ } ++ read_buf->append(std::move(buf)); ++ ++ return ret; ++} ++ ++} // namespace CacheStore ++} // namespace os +diff --git a/src/os/CacheStore/SyncFile.h b/src/os/CacheStore/SyncFile.h +new file mode 100644 +index 0000000..81602ce +--- /dev/null ++++ b/src/os/CacheStore/SyncFile.h +@@ -0,0 +1,74 @@ ++// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- ++// vim: ts=8 sw=2 smarttab ++ ++#ifndef CEPH_LIBOS_CACHE_STORE_SYNC_FILE ++#define CEPH_LIBOS_CACHE_STORE_SYNC_FILE ++ ++#include "include/buffer_fwd.h" ++#include <sys/mman.h> ++#include <string> ++ ++struct Context; ++struct ContextWQ; ++class CephContext; ++ ++namespace os { ++ ++namespace CacheStore { ++ ++class SyncFile { ++public: ++ SyncFile(CephContext *cct, const std::string &name); ++ ~SyncFile(); ++ ++ // TODO use IO queue instead of individual commands so operations can be ++ // submitted in batch ++ ++ // TODO use scatter/gather API ++ ++ void open(Context *on_finish); ++ ++ // ## ++ void open(); ++ bool try_open(); ++ void close(Context *on_finish); ++ void remove(Context *on_finish); ++ ++ void read(uint64_t offset, uint64_t length, ceph::bufferlist *bl, Context *on_finish); ++ ++ void write(uint64_t offset, ceph::bufferlist &&bl, bool fdatasync, Context *on_finish); ++ ++ void discard(uint64_t offset, uint64_t length, bool fdatasync, Context *on_finish); ++ ++ void truncate(uint64_t length, bool fdatasync, Context *on_finish); ++ ++ void fsync(Context *on_finish); ++ ++ void fdatasync(Context *on_finish); ++ ++ uint64_t filesize(); ++ ++ int load(void** dest, uint64_t filesize); ++ ++ int remove(); ++ ++ // ## ++ int write_object_to_file(ceph::bufferlist read_buf, uint64_t object_len); ++ int read_object_from_file(ceph::bufferlist* read_buf, uint64_t object_off, uint64_t object_len); ++ ++private: ++ CephContext *cct; ++ std::string m_name; ++ int m_fd = -1; ++ ++ int write(uint64_t offset, const ceph::bufferlist &bl, bool fdatasync); ++ int read(uint64_t offset, uint64_t length, ceph::bufferlist *bl); ++ int discard(uint64_t offset, uint64_t length, bool fdatasync); ++ int truncate(uint64_t length, bool fdatasync); ++ int fdatasync(); ++}; ++ ++} // namespace CacheStore ++} // namespace os ++ ++#endif // CEPH_LIBOS_CACHE_STORE_SYNC_FILE +diff --git a/src/test/librbd/test_mirroring.cc b/src/test/librbd/test_mirroring.cc +index b4fdeae..d7d1aa6 100644 +--- a/src/test/librbd/test_mirroring.cc ++++ b/src/test/librbd/test_mirroring.cc +@@ -47,6 +47,7 @@ public: + + void SetUp() override { + ASSERT_EQ(0, _rados.ioctx_create(_pool_name.c_str(), m_ioctx)); ++ ASSERT_EQ(0, _rados.conf_set("rbd_shared_cache_enabled", "false")); + } + + std::string image_name = "mirrorimg1"; +diff --git a/src/test/rbd_mirror/test_ImageReplayer.cc b/src/test/rbd_mirror/test_ImageReplayer.cc +index 8a95a65..b5598bd 100644 +--- a/src/test/rbd_mirror/test_ImageReplayer.cc ++++ b/src/test/rbd_mirror/test_ImageReplayer.cc +@@ -90,6 +90,7 @@ public: + EXPECT_EQ("", connect_cluster_pp(*m_local_cluster.get())); + EXPECT_EQ(0, m_local_cluster->conf_set("rbd_cache", "false")); + EXPECT_EQ(0, m_local_cluster->conf_set("rbd_mirror_journal_poll_age", "1")); ++ EXPECT_EQ(0, m_local_cluster->conf_set("rbd_shared_cache_enabled", "false")); + + m_local_pool_name = get_temp_pool_name(); + EXPECT_EQ(0, m_local_cluster->pool_create(m_local_pool_name.c_str())); +@@ -99,6 +100,7 @@ public: + + EXPECT_EQ("", connect_cluster_pp(m_remote_cluster)); + EXPECT_EQ(0, m_remote_cluster.conf_set("rbd_cache", "false")); ++ EXPECT_EQ(0, m_remote_cluster.conf_set("rbd_shared_cache_enabled", "false")); + + m_remote_pool_name = get_temp_pool_name(); + EXPECT_EQ(0, m_remote_cluster.pool_create(m_remote_pool_name.c_str())); +diff --git a/src/test/rbd_mirror/test_fixture.cc b/src/test/rbd_mirror/test_fixture.cc +index b2a51ca..9e77098 100644 +--- a/src/test/rbd_mirror/test_fixture.cc ++++ b/src/test/rbd_mirror/test_fixture.cc +@@ -27,6 +27,7 @@ void TestFixture::SetUpTestCase() { + _rados = std::shared_ptr<librados::Rados>(new librados::Rados()); + ASSERT_EQ("", connect_cluster_pp(*_rados.get())); + ASSERT_EQ(0, _rados->conf_set("rbd_cache", "false")); ++ ASSERT_EQ(0, _rados->conf_set("rbd_shared_cache_enabled", "false")); + + _local_pool_name = get_temp_pool_name("test-rbd-mirror-"); + ASSERT_EQ(0, _rados->pool_create(_local_pool_name.c_str())); +diff --git a/src/tools/CMakeLists.txt b/src/tools/CMakeLists.txt +index 3789e3c..72ab342 100644 +--- a/src/tools/CMakeLists.txt ++++ b/src/tools/CMakeLists.txt +@@ -99,6 +99,7 @@ endif(WITH_CEPHFS) + if(WITH_RBD) + add_subdirectory(rbd) + add_subdirectory(rbd_mirror) ++ add_subdirectory(rbd_cache) + if(LINUX) + add_subdirectory(rbd_nbd) + endif() +diff --git a/src/tools/rbd_cache/CMakeLists.txt b/src/tools/rbd_cache/CMakeLists.txt +new file mode 100644 +index 0000000..08eae60 +--- /dev/null ++++ b/src/tools/rbd_cache/CMakeLists.txt +@@ -0,0 +1,9 @@ ++add_executable(rbd-cache ++ ${CMAKE_SOURCE_DIR}/src/os/CacheStore/SyncFile.cc ++ ObjectCacheStore.cc ++ CacheController.cc ++ main.cc) ++target_link_libraries(rbd-cache ++ librados ++ global) ++install(TARGETS rbd-cache DESTINATION bin) +diff --git a/src/tools/rbd_cache/CacheController.cc b/src/tools/rbd_cache/CacheController.cc +new file mode 100644 +index 0000000..c914358 +--- /dev/null ++++ b/src/tools/rbd_cache/CacheController.cc +@@ -0,0 +1,105 @@ ++// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- ++// vim: ts=8 sw=2 smarttab ++ ++#include "CacheController.hpp" ++ ++#define dout_context g_ceph_context ++#define dout_subsys ceph_subsys_rbd_cache ++#undef dout_prefix ++#define dout_prefix *_dout << "rbd::cache::CacheController: " << this << " " \ ++ << __func__ << ": " ++ ++ ++class ThreadPoolSingleton : public ThreadPool { ++public: ++ ContextWQ *op_work_queue; ++ ++ explicit ThreadPoolSingleton(CephContext *cct) ++ : ThreadPool(cct, "librbd::cache::thread_pool", "tp_librbd_cache", 32, ++ "pcache_threads"), ++ op_work_queue(new ContextWQ("librbd::pcache_op_work_queue", ++ cct->_conf->get_val<int64_t>("rbd_op_thread_timeout"), ++ this)) { ++ start(); ++ } ++ ~ThreadPoolSingleton() override { ++ op_work_queue->drain(); ++ delete op_work_queue; ++ ++ stop(); ++ } ++}; ++ ++ ++CacheController::CacheController(CephContext *cct, const std::vector<const char*> &args): ++ m_args(args), m_cct(cct) { ++ ++} ++ ++CacheController::~CacheController() { ++ ++} ++ ++int CacheController::init() { ++ ThreadPoolSingleton* thread_pool_singleton = &m_cct->lookup_or_create_singleton_object<ThreadPoolSingleton>( ++ "rbd::cache::thread_pool", false, m_cct); ++ pcache_op_work_queue = thread_pool_singleton->op_work_queue; ++ ++ m_object_cache_store = new ObjectCacheStore(m_cct, pcache_op_work_queue); ++ int r = m_object_cache_store->init(false); ++ if (r < 0) { ++ //derr << "init error\n" << dendl; ++ } ++ return r; ++} ++ ++int CacheController::shutdown() { ++ int r = m_object_cache_store->shutdown(); ++ return r; ++} ++ ++void CacheController::handle_signal(int signum){} ++ ++void CacheController::run() { ++ try { ++ //TODO(): use new socket path ++ std::string controller_path = "/tmp/rbd_shared_readonly_cache_demo"; ++ std::remove(controller_path.c_str()); ++ ++ m_cache_server = new CacheServer(io_service, controller_path, ++ ([&](uint64_t p, std::string s){handle_request(p, s);})); ++ io_service.run(); ++ } catch (std::exception& e) { ++ std::cerr << "Exception: " << e.what() << "\n"; ++ } ++} ++ ++void CacheController::handle_request(uint64_t sesstion_id, std::string msg){ ++ rbdsc_req_type_t *io_ctx = (rbdsc_req_type_t*)(msg.c_str()); ++ ++ int ret = 0; ++ ++ switch (io_ctx->type) { ++ case RBDSC_REGISTER: { ++ // init cache layout for volume ++ m_object_cache_store->init_cache(io_ctx->vol_name, io_ctx->vol_size); ++ io_ctx->type = RBDSC_REGISTER_REPLY; ++ m_cache_server->send(sesstion_id, std::string((char*)io_ctx, msg.size())); ++ ++ break; ++ } ++ case RBDSC_READ: { ++ // lookup object in local cache store ++ ret = m_object_cache_store->lookup_object(io_ctx->pool_name, io_ctx->vol_name); ++ if (ret < 0) { ++ io_ctx->type = RBDSC_READ_RADOS; ++ } else { ++ io_ctx->type = RBDSC_READ_REPLY; ++ } ++ m_cache_server->send(sesstion_id, std::string((char*)io_ctx, msg.size())); ++ ++ break; ++ } ++ ++ } ++} +diff --git a/src/tools/rbd_cache/CacheController.hpp b/src/tools/rbd_cache/CacheController.hpp +new file mode 100644 +index 0000000..97113e4 +--- /dev/null ++++ b/src/tools/rbd_cache/CacheController.hpp +@@ -0,0 +1,49 @@ ++#ifndef CACHE_CONTROLLER_H ++#define CACHE_CONTROLLER_H ++ ++#include <thread> ++ ++#include "common/Formatter.h" ++#include "common/admin_socket.h" ++#include "common/debug.h" ++#include "common/errno.h" ++#include "common/ceph_context.h" ++#include "common/Mutex.h" ++#include "common/WorkQueue.h" ++#include "include/rados/librados.hpp" ++#include "include/rbd/librbd.h" ++#include "include/assert.h" ++#include "librbd/ImageCtx.h" ++#include "librbd/ImageState.h" ++ ++#include "CacheControllerSocket.hpp" ++#include "ObjectCacheStore.h" ++ ++ ++using boost::asio::local::stream_protocol; ++ ++class CacheController { ++ public: ++ CacheController(CephContext *cct, const std::vector<const char*> &args); ++ ~CacheController(); ++ ++ int init(); ++ ++ int shutdown(); ++ ++ void handle_signal(int sinnum); ++ ++ void run(); ++ ++ void handle_request(uint64_t sesstion_id, std::string msg); ++ ++ private: ++ boost::asio::io_service io_service; ++ CacheServer *m_cache_server; ++ std::vector<const char*> m_args; ++ CephContext *m_cct; ++ ObjectCacheStore *m_object_cache_store; ++ ContextWQ* pcache_op_work_queue; ++}; ++ ++#endif +diff --git a/src/tools/rbd_cache/CacheControllerSocket.hpp b/src/tools/rbd_cache/CacheControllerSocket.hpp +new file mode 100644 +index 0000000..6e1a743 +--- /dev/null ++++ b/src/tools/rbd_cache/CacheControllerSocket.hpp +@@ -0,0 +1,125 @@ ++// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- ++// vim: ts=8 sw=2 smarttab ++ ++#ifndef CACHE_CONTROLLER_SOCKET_H ++#define CACHE_CONTROLLER_SOCKET_H ++ ++#include <cstdio> ++#include <iostream> ++#include <array> ++#include <memory> ++#include <string> ++#include <boost/bind.hpp> ++#include <boost/asio.hpp> ++#include <boost/algorithm/string.hpp> ++#include "CacheControllerSocketCommon.h" ++ ++ ++using boost::asio::local::stream_protocol; ++ ++class session : public std::enable_shared_from_this<session> { ++public: ++ session(uint64_t session_id, boost::asio::io_service& io_service, ProcessMsg processmsg) ++ : session_id(session_id), socket_(io_service), process_msg(processmsg) {} ++ ++ stream_protocol::socket& socket() { ++ return socket_; ++ } ++ ++ void start() { ++ ++ boost::asio::async_read(socket_, boost::asio::buffer(data_), ++ boost::asio::transfer_exactly(544), ++ boost::bind(&session::handle_read, ++ shared_from_this(), ++ boost::asio::placeholders::error, ++ boost::asio::placeholders::bytes_transferred)); ++ ++ } ++ ++ void handle_read(const boost::system::error_code& error, size_t bytes_transferred) { ++ ++ if (!error) { ++ ++ process_msg(session_id, std::string(data_, bytes_transferred)); ++ ++ } ++ } ++ ++ void handle_write(const boost::system::error_code& error) { ++ if (!error) { ++ socket_.async_read_some(boost::asio::buffer(data_), ++ boost::bind(&session::handle_read, ++ shared_from_this(), ++ boost::asio::placeholders::error, ++ boost::asio::placeholders::bytes_transferred)); ++ } ++ } ++ ++ void send(std::string msg) { ++ ++ boost::asio::async_write(socket_, ++ boost::asio::buffer(msg.c_str(), msg.size()), ++ boost::bind(&session::handle_write, ++ shared_from_this(), ++ boost::asio::placeholders::error)); ++ ++ } ++ ++private: ++ uint64_t session_id; ++ stream_protocol::socket socket_; ++ ProcessMsg process_msg; ++ ++ // Buffer used to store data received from the client. ++ //std::array<char, 1024> data_; ++ char data_[1024]; ++}; ++ ++typedef std::shared_ptr<session> session_ptr; ++ ++class CacheServer { ++public: ++ CacheServer(boost::asio::io_service& io_service, ++ const std::string& file, ProcessMsg processmsg) ++ : io_service_(io_service), ++ server_process_msg(processmsg), ++ acceptor_(io_service, stream_protocol::endpoint(file)) ++ { ++ session_ptr new_session(new session(session_id, io_service_, server_process_msg)); ++ acceptor_.async_accept(new_session->socket(), ++ boost::bind(&CacheServer::handle_accept, this, new_session, ++ boost::asio::placeholders::error)); ++ } ++ ++ void handle_accept(session_ptr new_session, ++ const boost::system::error_code& error) ++ { ++ //TODO(): open librbd snap ++ if (!error) { ++ new_session->start(); ++ session_map.emplace(session_id, new_session); ++ session_id++; ++ new_session.reset(new session(session_id, io_service_, server_process_msg)); ++ acceptor_.async_accept(new_session->socket(), ++ boost::bind(&CacheServer::handle_accept, this, new_session, ++ boost::asio::placeholders::error)); ++ } ++ } ++ ++ void send(uint64_t session_id, std::string msg) { ++ auto it = session_map.find(session_id); ++ if (it != session_map.end()) { ++ it->second->send(msg); ++ } ++ } ++ ++private: ++ boost::asio::io_service& io_service_; ++ ProcessMsg server_process_msg; ++ stream_protocol::acceptor acceptor_; ++ uint64_t session_id = 1; ++ std::map<uint64_t, session_ptr> session_map; ++}; ++ ++#endif +diff --git a/src/tools/rbd_cache/CacheControllerSocketClient.hpp b/src/tools/rbd_cache/CacheControllerSocketClient.hpp +new file mode 100644 +index 0000000..8e61aa9 +--- /dev/null ++++ b/src/tools/rbd_cache/CacheControllerSocketClient.hpp +@@ -0,0 +1,131 @@ ++// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- ++// vim: ts=8 sw=2 smarttab ++ ++#ifndef CACHE_CONTROLLER_SOCKET_CLIENT_H ++#define CACHE_CONTROLLER_SOCKET_CLIENT_H ++ ++#include <boost/asio.hpp> ++#include <boost/bind.hpp> ++#include <boost/algorithm/string.hpp> ++#include "include/assert.h" ++#include "CacheControllerSocketCommon.h" ++ ++ ++using boost::asio::local::stream_protocol; ++ ++class CacheClient { ++public: ++ CacheClient(boost::asio::io_service& io_service, ++ const std::string& file, ClientProcessMsg processmsg) ++ : io_service_(io_service), ++ io_service_work_(io_service), ++ socket_(io_service), ++ m_client_process_msg(processmsg), ++ ep_(stream_protocol::endpoint(file)) ++ { ++ std::thread thd([this](){io_service_.run(); }); ++ thd.detach(); ++ } ++ ++ void run(){ ++ } ++ ++ int connect() { ++ try { ++ socket_.connect(ep_); ++ } catch (std::exception& e) { ++ return -1; ++ } ++ connected = true; ++ return 0; ++ } ++ ++ int register_volume(std::string pool_name, std::string vol_name, uint64_t vol_size) { ++ // cache controller will init layout ++ rbdsc_req_type_t *message = new rbdsc_req_type_t(); ++ message->type = RBDSC_REGISTER; ++ memcpy(message->pool_name, pool_name.c_str(), pool_name.size()); ++ memcpy(message->vol_name, vol_name.c_str(), vol_name.size()); ++ message->vol_size = vol_size; ++ message->offset = 0; ++ message->length = 0; ++ boost::asio::async_write(socket_, boost::asio::buffer((char*)message, message->size()), ++ [this](const boost::system::error_code& err, size_t cb) { ++ if (!err) { ++ boost::asio::async_read(socket_, boost::asio::buffer(buffer_), ++ boost::asio::transfer_exactly(544), ++ [this](const boost::system::error_code& err, size_t cb) { ++ if (!err) { ++ m_client_process_msg(std::string(buffer_, cb)); ++ } else { ++ return -1; ++ } ++ }); ++ } else { ++ return -1; ++ } ++ }); ++ ++ return 0; ++ } ++ ++ int lookup_object(std::string pool_name, std::string vol_name, std::string object_id, bool* result) { ++ rbdsc_req_type_t *message = new rbdsc_req_type_t(); ++ message->type = RBDSC_READ; ++ memcpy(message->pool_name, pool_name.c_str(), pool_name.size()); ++ memcpy(message->vol_name, object_id.c_str(), object_id.size()); ++ message->vol_size = 0; ++ message->offset = 0; ++ message->length = 0; ++ ++ boost::asio::async_write(socket_, boost::asio::buffer((char*)message, message->size()), ++ [this, result](const boost::system::error_code& err, size_t cb) { ++ if (!err) { ++ get_result(result); ++ } else { ++ return -1; ++ } ++ }); ++ std::unique_lock<std::mutex> lk(m); ++ cv.wait(lk); ++ return 0; ++ } ++ ++ void get_result(bool* result) { ++ boost::asio::async_read(socket_, boost::asio::buffer(buffer_), ++ boost::asio::transfer_exactly(544), ++ [this, result](const boost::system::error_code& err, size_t cb) { ++ if (!err) { ++ *result = true; ++ cv.notify_one(); ++ m_client_process_msg(std::string(buffer_, cb)); ++ } else { ++ return -1; ++ } ++ }); ++ } ++ ++ void handle_connect(const boost::system::error_code& error) { ++ //TODO(): open librbd snap ++ } ++ ++ void handle_write(const boost::system::error_code& error) { ++ } ++ ++private: ++ boost::asio::io_service& io_service_; ++ boost::asio::io_service::work io_service_work_; ++ stream_protocol::socket socket_; ++ ClientProcessMsg m_client_process_msg; ++ stream_protocol::endpoint ep_; ++ char buffer_[1024]; ++ int block_size_ = 1024; ++ ++ std::condition_variable cv; ++ std::mutex m; ++ ++public: ++ bool connected = false; ++}; ++ ++#endif +diff --git a/src/tools/rbd_cache/CacheControllerSocketCommon.h b/src/tools/rbd_cache/CacheControllerSocketCommon.h +new file mode 100644 +index 0000000..e253bb1 +--- /dev/null ++++ b/src/tools/rbd_cache/CacheControllerSocketCommon.h +@@ -0,0 +1,43 @@ ++// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- ++// vim: ts=8 sw=2 smarttab ++ ++#ifndef CACHE_CONTROLLER_SOCKET_COMMON_H ++#define CACHE_CONTROLLER_SOCKET_COMMON_H ++ ++#define RBDSC_REGISTER 0X11 ++#define RBDSC_READ 0X12 ++#define RBDSC_LOOKUP 0X13 ++#define RBDSC_REGISTER_REPLY 0X14 ++#define RBDSC_READ_REPLY 0X15 ++#define RBDSC_LOOKUP_REPLY 0X16 ++#define RBDSC_READ_RADOS 0X16 ++ ++typedef std::function<void(uint64_t, std::string)> ProcessMsg; ++typedef std::function<void(std::string)> ClientProcessMsg; ++typedef uint8_t rbdsc_req_type; ++struct rbdsc_req_type_t { ++ rbdsc_req_type type; ++ uint64_t vol_size; ++ uint64_t offset; ++ uint64_t length; ++ char pool_name[256]; ++ char vol_name[256]; ++ ++ uint64_t size() { ++ return sizeof(rbdsc_req_type_t); ++ } ++ ++ std::string to_buffer() { ++ std::stringstream ss; ++ ss << type; ++ ss << vol_size; ++ ss << offset; ++ ss << length; ++ ss << pool_name; ++ ss << vol_name; ++ ++ return ss.str(); ++ } ++}; ++ ++#endif +diff --git a/src/tools/rbd_cache/ObjectCacheStore.cc b/src/tools/rbd_cache/ObjectCacheStore.cc +new file mode 100644 +index 0000000..90b407c +--- /dev/null ++++ b/src/tools/rbd_cache/ObjectCacheStore.cc +@@ -0,0 +1,147 @@ ++// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- ++// vim: ts=8 sw=2 smarttab ++ ++#include "ObjectCacheStore.h" ++ ++#define dout_context g_ceph_context ++#define dout_subsys ceph_subsys_rbd_cache ++#undef dout_prefix ++#define dout_prefix *_dout << "rbd::cache::ObjectCacheStore: " << this << " " \ ++ << __func__ << ": " ++ ++ ++ObjectCacheStore::ObjectCacheStore(CephContext *cct, ContextWQ* work_queue) ++ : m_cct(cct), m_work_queue(work_queue), ++ m_cache_table_lock("rbd::cache::ObjectCacheStore"), ++ m_rados(new librados::Rados()) { ++} ++ ++ObjectCacheStore::~ObjectCacheStore() { ++ ++} ++ ++int ObjectCacheStore::init(bool reset) { ++ ++ int ret = m_rados->init_with_context(m_cct); ++ if(ret < 0) { ++ lderr(m_cct) << "fail to init Ceph context" << dendl; ++ return ret; ++ } ++ ++ ret = m_rados->connect(); ++ if(ret < 0 ) { ++ lderr(m_cct) << "fail to conect to cluster" << dendl; ++ return ret; ++ } ++ //TODO(): check existing cache objects ++ return ret; ++} ++ ++int ObjectCacheStore::do_promote(std::string pool_name, std::string object_name) { ++ int ret = 0; ++ std::string cache_file_name = pool_name + object_name; ++ ++ if (m_ioctxs.find(pool_name) == m_ioctxs.end()) { ++ librados::IoCtx* io_ctx = new librados::IoCtx(); ++ ret = m_rados->ioctx_create(pool_name.c_str(), *io_ctx); ++ if (ret < 0) { ++ lderr(m_cct) << "fail to create ioctx" << dendl; ++ assert(0); ++ } ++ m_ioctxs.emplace(pool_name, io_ctx); ++ } ++ ++ assert(m_ioctxs.find(pool_name) != m_ioctxs.end()); ++ ++ librados::IoCtx* ioctx = m_ioctxs[pool_name]; ++ ++ //promoting: update metadata ++ { ++ Mutex::Locker locker(m_cache_table_lock); ++ m_cache_table.emplace(cache_file_name, PROMOTING); ++ } ++ ++ librados::bufferlist read_buf; ++ int object_size = 4096*1024; //TODO(): read config from image metadata ++ ++ //TODO(): async promote ++ ret = promote_object(ioctx, object_name, read_buf, object_size); ++ if (ret == -ENOENT) { ++ read_buf.append(std::string(object_size, '0')); ++ ret = 0; ++ } ++ ++ if( ret < 0) { ++ lderr(m_cct) << "fail to read from rados" << dendl; ++ return ret; ++ } ++ ++ // persistent to cache ++ os::CacheStore::SyncFile cache_file(m_cct, cache_file_name); ++ cache_file.open(); ++ ret = cache_file.write_object_to_file(read_buf, object_size); ++ ++ assert(m_cache_table.find(cache_file_name) != m_cache_table.end()); ++ ++ // update metadata ++ { ++ Mutex::Locker locker(m_cache_table_lock); ++ m_cache_table.emplace(cache_file_name, PROMOTED); ++ } ++ ++ return ret; ++ ++} ++ ++int ObjectCacheStore::lookup_object(std::string pool_name, std::string object_name) { ++ ++ std::string cache_file_name = pool_name + object_name; ++ { ++ Mutex::Locker locker(m_cache_table_lock); ++ ++ auto it = m_cache_table.find(cache_file_name); ++ if (it != m_cache_table.end()) { ++ ++ if (it->second == PROMOTING) { ++ return -1; ++ } else if (it->second == PROMOTED) { ++ return 0; ++ } else { ++ assert(0); ++ } ++ } ++ } ++ ++ int ret = do_promote(pool_name, object_name); ++ ++ return ret; ++} ++ ++int ObjectCacheStore::shutdown() { ++ m_rados->shutdown(); ++ return 0; ++} ++ ++int ObjectCacheStore::init_cache(std::string vol_name, uint64_t vol_size) { ++ return 0; ++} ++ ++int ObjectCacheStore::lock_cache(std::string vol_name) { ++ return 0; ++} ++ ++int ObjectCacheStore::promote_object(librados::IoCtx* ioctx, std::string object_name, librados::bufferlist read_buf, uint64_t read_len) { ++ int ret; ++ ++ librados::AioCompletion* read_completion = librados::Rados::aio_create_completion(); ++ ++ ret = ioctx->aio_read(object_name, read_completion, &read_buf, read_len, 0); ++ if(ret < 0) { ++ lderr(m_cct) << "fail to read from rados" << dendl; ++ return ret; ++ } ++ read_completion->wait_for_complete(); ++ ret = read_completion->get_return_value(); ++ return ret; ++ ++} +diff --git a/src/tools/rbd_cache/ObjectCacheStore.h b/src/tools/rbd_cache/ObjectCacheStore.h +new file mode 100644 +index 0000000..12f8399 +--- /dev/null ++++ b/src/tools/rbd_cache/ObjectCacheStore.h +@@ -0,0 +1,65 @@ ++// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- ++// vim: ts=8 sw=2 smarttab ++ ++#ifndef OBJECT_CACHE_STORE_H ++#define OBJECT_CACHE_STORE_H ++ ++#include "common/debug.h" ++#include "common/errno.h" ++#include "common/ceph_context.h" ++#include "common/Mutex.h" ++#include "include/rados/librados.hpp" ++#include "include/rbd/librbd.h" ++#include "librbd/ImageCtx.h" ++#include "librbd/ImageState.h" ++#include "os/CacheStore/SyncFile.h" ++ ++using librados::Rados; ++using librados::IoCtx; ++ ++typedef shared_ptr<librados::Rados> RadosRef; ++typedef shared_ptr<librados::IoCtx> IoCtxRef; ++ ++class ObjectCacheStore ++{ ++ public: ++ ObjectCacheStore(CephContext *cct, ContextWQ* work_queue); ++ ~ObjectCacheStore(); ++ ++ int init(bool reset); ++ ++ int shutdown(); ++ ++ int lookup_object(std::string pool_name, std::string object_name); ++ ++ int init_cache(std::string vol_name, uint64_t vol_size); ++ ++ int lock_cache(std::string vol_name); ++ ++ private: ++ int _evict_object(); ++ ++ int do_promote(std::string pool_name, std::string object_name); ++ ++ int promote_object(librados::IoCtx*, std::string object_name, ++ librados::bufferlist read_buf, ++ uint64_t length); ++ ++ enum { ++ PROMOTING = 0, ++ PROMOTED, ++ }; ++ ++ CephContext *m_cct; ++ ContextWQ* m_work_queue; ++ Mutex m_cache_table_lock; ++ RadosRef m_rados; ++ ++ std::map<std::string, uint8_t> m_cache_table; ++ ++ std::map<std::string, librados::IoCtx*> m_ioctxs; ++ ++ os::CacheStore::SyncFile *m_cache_file; ++}; ++ ++#endif +diff --git a/src/tools/rbd_cache/main.cc b/src/tools/rbd_cache/main.cc +new file mode 100644 +index 0000000..336a581 +--- /dev/null ++++ b/src/tools/rbd_cache/main.cc +@@ -0,0 +1,85 @@ ++// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- ++// vim: ts=8 sw=2 smarttab ++ ++#include "common/ceph_argparse.h" ++#include "common/config.h" ++#include "common/debug.h" ++#include "common/errno.h" ++#include "global/global_init.h" ++#include "global/signal_handler.h" ++#include "CacheController.hpp" ++ ++#include <vector> ++ ++CacheController *cachectl = nullptr; ++ ++void usage() { ++ std::cout << "usage: cache controller [options...]" << std::endl; ++ std::cout << "options:\n"; ++ std::cout << " -m monaddress[:port] connect to specified monitor\n"; ++ std::cout << " --keyring=<path> path to keyring for local cluster\n"; ++ std::cout << " --log-file=<logfile> file to log debug output\n"; ++ std::cout << " --debug-rbd-cachecontroller=<log-level>/<memory-level> set rbd-mirror debug level\n"; ++ generic_server_usage(); ++} ++ ++static void handle_signal(int signum) ++{ ++ if (cachectl) ++ cachectl->handle_signal(signum); ++} ++ ++int main(int argc, const char **argv) ++{ ++ std::vector<const char*> args; ++ env_to_vec(args); ++ argv_to_vec(argc, argv, args); ++ ++ auto cct = global_init(nullptr, args, CEPH_ENTITY_TYPE_CLIENT, ++ CODE_ENVIRONMENT_DAEMON, ++ CINIT_FLAG_UNPRIVILEGED_DAEMON_DEFAULTS); ++ ++ for (auto i = args.begin(); i != args.end(); ++i) { ++ if (ceph_argparse_flag(args, i, "-h", "--help", (char*)NULL)) { ++ usage(); ++ return EXIT_SUCCESS; ++ } ++ } ++ ++ if (g_conf->daemonize) { ++ global_init_daemonize(g_ceph_context); ++ } ++ g_ceph_context->enable_perf_counter(); ++ ++ common_init_finish(g_ceph_context); ++ ++ init_async_signal_handler(); ++ register_async_signal_handler(SIGHUP, sighup_handler); ++ register_async_signal_handler_oneshot(SIGINT, handle_signal); ++ register_async_signal_handler_oneshot(SIGTERM, handle_signal); ++ ++ std::vector<const char*> cmd_args; ++ argv_to_vec(argc, argv, cmd_args); ++ ++ // disable unnecessary librbd cache ++ g_ceph_context->_conf->set_val_or_die("rbd_cache", "false"); ++ ++ cachectl = new CacheController(g_ceph_context, cmd_args); ++ int r = cachectl->init(); ++ if (r < 0) { ++ std::cerr << "failed to initialize: " << cpp_strerror(r) << std::endl; ++ goto cleanup; ++ } ++ ++ cachectl->run(); ++ ++ cleanup: ++ unregister_async_signal_handler(SIGHUP, sighup_handler); ++ unregister_async_signal_handler(SIGINT, handle_signal); ++ unregister_async_signal_handler(SIGTERM, handle_signal); ++ shutdown_async_signal_handler(); ++ ++ delete cachectl; ++ ++ return r < 0 ? EXIT_SUCCESS : EXIT_FAILURE; ++} +-- +2.7.4 + |