diff options
author | Qiaowei Ren <qiaowei.ren@intel.com> | 2018-09-11 10:54:20 +0800 |
---|---|---|
committer | Qiaowei Ren <qiaowei.ren@intel.com> | 2018-09-11 10:54:20 +0800 |
commit | d65e22d27ab305d38059046dae60d7a66ff4a4e0 (patch) | |
tree | d0bd06459534ef33d0b930f6c164c4501bd527d7 | |
parent | 828acdd1d5c5c2aeef287aa69e473bf44fcbce70 (diff) |
ceph: shared persistent read-only rbd cache
This patch introduces introduces RBD shared persistent RO cache which
can provide client-side sharing cache for rbd clone/snapshot case.
Change-Id: Icad8063f4f10b1ab4ce31920e90d5affa7d0abdc
Signed-off-by: Qiaowei Ren <qiaowei.ren@intel.com>
Signed-off-by: Dehao Shang <dehao.shang@intel.com>
Signed-off-by: Tushar Gohad <tushar.gohad@intel.com>
Signed-off-by: Jason Dillaman <dillaman@redhat.com>
Signed-off-by: Yuan Zhou <yuan.zhou@intel.com>
-rw-r--r-- | src/ceph/0001-librbd-shared-persistent-read-only-rbd-cache.patch | 1685 | ||||
-rw-r--r-- | src/ceph/0002-librbd-cleanup-rbd-shared-RO-cache.patch | 847 | ||||
-rw-r--r-- | src/ceph/0003-librbd-fix-bufferlist-point.patch | 71 | ||||
-rw-r--r-- | src/ceph/0004-librbd-fix-lookup-object-return.patch | 45 | ||||
-rw-r--r-- | src/ceph/0005-librbd-fix-conf-get_val.patch | 63 | ||||
-rw-r--r-- | src/ceph/0006-librbd-LRU-policy-based-eviction.patch | 403 | ||||
-rw-r--r-- | src/ceph/0007-librbd-cleanup-policy-based-promotion-eviction.patch | 512 | ||||
-rw-r--r-- | src/ceph/0008-librbd-implement-async-cache-lookup-and-read.patch | 366 | ||||
-rw-r--r-- | src/ceph/0009-librbd-clean-up-on-rbd-shared-cache.patch | 767 | ||||
-rw-r--r-- | src/ceph/0010-librbd-new-namespace-ceph-immutable-obj-cache.patch | 3510 | ||||
-rw-r--r-- | src/ceph/ceph.rc | 13 |
11 files changed, 8281 insertions, 1 deletions
diff --git a/src/ceph/0001-librbd-shared-persistent-read-only-rbd-cache.patch b/src/ceph/0001-librbd-shared-persistent-read-only-rbd-cache.patch new file mode 100644 index 0000000..0476086 --- /dev/null +++ b/src/ceph/0001-librbd-shared-persistent-read-only-rbd-cache.patch @@ -0,0 +1,1685 @@ +From b7b81562c76011abe05930330915a5ba423964e4 Mon Sep 17 00:00:00 2001 +From: Yuan Zhou <yuan.zhou@intel.com> +Date: Thu, 19 Apr 2018 22:54:36 +0800 +Subject: [PATCH 01/10] librbd: shared persistent read-only rbd cache + +This patch introduces introduces RBD shared persistent RO cache which +can provide client-side sharing cache for rbd clone/snapshot case. + +The key componenets are: + +- RBD cache daemon runs on each compute node to control the shared cache state + +- Read-only blocks from parent image(s) are cached in a shared area on + compute node(s) + +- Object level dispatcher inside librbd that can do RPC with cache daemon to + lookup the cache + +- Reads are served from the shared cache until the first COW request + +- Policy to control promotion/evication of the shared cache + +The general IO flow is: + +0) Parent image would register themselfs when initializing + +1) When read request on cloned image flows to parent image, it will check with + the cache daemon if the rarget object is ready + +2) Cache daemon receives the lookup request: + a) if the target object is promoted, daemon will ack with "read_from_cache" + b) if it is not promoted, daemon will check the policy whether to promote: + - if yes, daemon will do the promiton then ack with "read_from_cache" + - if no, daemon will ack with "read_from_rados" + +3) the read reqeust contines to do read from cache/rados based on the ack + +Signed-off-by: Yuan Zhou <yuan.zhou@intel.com> +--- + src/common/options.cc | 8 ++ + src/librbd/CMakeLists.txt | 4 +- + src/librbd/ImageCtx.cc | 5 +- + src/librbd/ImageCtx.h | 3 + + src/librbd/cache/SharedPersistentObjectCacher.cc | 61 ++++++++ + src/librbd/cache/SharedPersistentObjectCacher.h | 45 ++++++ + .../SharedPersistentObjectCacherObjectDispatch.cc | 154 +++++++++++++++++++++ + .../SharedPersistentObjectCacherObjectDispatch.h | 127 +++++++++++++++++ + src/librbd/image/OpenRequest.cc | 12 +- + src/librbd/io/Types.h | 1 + + src/os/CacheStore/SyncFile.cc | 110 +++++++++++++++ + src/os/CacheStore/SyncFile.h | 74 ++++++++++ + src/test/librbd/test_mirroring.cc | 1 + + src/test/rbd_mirror/test_ImageReplayer.cc | 2 + + src/test/rbd_mirror/test_fixture.cc | 1 + + src/tools/CMakeLists.txt | 1 + + src/tools/rbd_cache/CMakeLists.txt | 9 ++ + src/tools/rbd_cache/CacheController.cc | 105 ++++++++++++++ + src/tools/rbd_cache/CacheController.hpp | 49 +++++++ + src/tools/rbd_cache/CacheControllerSocket.hpp | 125 +++++++++++++++++ + .../rbd_cache/CacheControllerSocketClient.hpp | 131 ++++++++++++++++++ + src/tools/rbd_cache/CacheControllerSocketCommon.h | 43 ++++++ + src/tools/rbd_cache/ObjectCacheStore.cc | 147 ++++++++++++++++++++ + src/tools/rbd_cache/ObjectCacheStore.h | 65 +++++++++ + src/tools/rbd_cache/main.cc | 85 ++++++++++++ + 25 files changed, 1365 insertions(+), 3 deletions(-) + create mode 100644 src/librbd/cache/SharedPersistentObjectCacher.cc + create mode 100644 src/librbd/cache/SharedPersistentObjectCacher.h + create mode 100644 src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc + create mode 100644 src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.h + create mode 100644 src/os/CacheStore/SyncFile.cc + create mode 100644 src/os/CacheStore/SyncFile.h + create mode 100644 src/tools/rbd_cache/CMakeLists.txt + create mode 100644 src/tools/rbd_cache/CacheController.cc + create mode 100644 src/tools/rbd_cache/CacheController.hpp + create mode 100644 src/tools/rbd_cache/CacheControllerSocket.hpp + create mode 100644 src/tools/rbd_cache/CacheControllerSocketClient.hpp + create mode 100644 src/tools/rbd_cache/CacheControllerSocketCommon.h + create mode 100644 src/tools/rbd_cache/ObjectCacheStore.cc + create mode 100644 src/tools/rbd_cache/ObjectCacheStore.h + create mode 100644 src/tools/rbd_cache/main.cc + +diff --git a/src/common/options.cc b/src/common/options.cc +index c5afe4c..7839a31 100644 +--- a/src/common/options.cc ++++ b/src/common/options.cc +@@ -6357,6 +6357,14 @@ static std::vector<Option> get_rbd_options() { + .set_default(60) + .set_description("time in seconds for detecting a hung thread"), + ++ Option("rbd_shared_cache_enabled", Option::TYPE_BOOL, Option::LEVEL_ADVANCED) ++ .set_default(true) ++ .set_description("whether to enable shared ssd caching"), ++ ++ Option("rbd_shared_cache_path", Option::TYPE_STR, Option::LEVEL_ADVANCED) ++ .set_default("/tmp") ++ .set_description("shared ssd caching data dir"), ++ + Option("rbd_non_blocking_aio", Option::TYPE_BOOL, Option::LEVEL_ADVANCED) + .set_default(true) + .set_description("process AIO ops from a dispatch thread to prevent blocking"), +diff --git a/src/librbd/CMakeLists.txt b/src/librbd/CMakeLists.txt +index b9c08d4..92539a8 100644 +--- a/src/librbd/CMakeLists.txt ++++ b/src/librbd/CMakeLists.txt +@@ -32,7 +32,8 @@ set(librbd_internal_srcs + api/Snapshot.cc + cache/ImageWriteback.cc + cache/ObjectCacherObjectDispatch.cc +- cache/PassthroughImageCache.cc ++ cache/SharedPersistentObjectCacherObjectDispatch.cc ++ cache/SharedPersistentObjectCacher.cc + deep_copy/ImageCopyRequest.cc + deep_copy/MetadataCopyRequest.cc + deep_copy/ObjectCopyRequest.cc +@@ -123,6 +124,7 @@ set(librbd_internal_srcs + trash/MoveRequest.cc + watcher/Notifier.cc + watcher/RewatchRequest.cc ++ ${CMAKE_SOURCE_DIR}/src/os/CacheStore/SyncFile.cc + ${CMAKE_SOURCE_DIR}/src/common/ContextCompletion.cc) + + add_library(rbd_api STATIC librbd.cc) +diff --git a/src/librbd/ImageCtx.cc b/src/librbd/ImageCtx.cc +index 48f98b1..349156b 100644 +--- a/src/librbd/ImageCtx.cc ++++ b/src/librbd/ImageCtx.cc +@@ -776,7 +776,8 @@ public: + "rbd_qos_read_iops_limit", false)( + "rbd_qos_write_iops_limit", false)( + "rbd_qos_read_bps_limit", false)( +- "rbd_qos_write_bps_limit", false); ++ "rbd_qos_write_bps_limit", false)( ++ "rbd_shared_cache_enabled", false); + + ConfigProxy local_config_t{false}; + std::map<std::string, bufferlist> res; +@@ -844,6 +845,8 @@ public: + ASSIGN_OPTION(qos_write_iops_limit, uint64_t); + ASSIGN_OPTION(qos_read_bps_limit, uint64_t); + ASSIGN_OPTION(qos_write_bps_limit, uint64_t); ++ ASSIGN_OPTION(shared_cache_enabled, bool); ++ ASSIGN_OPTION(shared_cache_path, std::string); + + if (thread_safe) { + ASSIGN_OPTION(journal_pool, std::string); +diff --git a/src/librbd/ImageCtx.h b/src/librbd/ImageCtx.h +index d197c24..f661c09 100644 +--- a/src/librbd/ImageCtx.h ++++ b/src/librbd/ImageCtx.h +@@ -204,6 +204,9 @@ namespace librbd { + uint64_t qos_read_bps_limit; + uint64_t qos_write_bps_limit; + ++ bool shared_cache_enabled; ++ std::string shared_cache_path; ++ + LibrbdAdminSocketHook *asok_hook; + + exclusive_lock::Policy *exclusive_lock_policy = nullptr; +diff --git a/src/librbd/cache/SharedPersistentObjectCacher.cc b/src/librbd/cache/SharedPersistentObjectCacher.cc +new file mode 100644 +index 0000000..a849260 +--- /dev/null ++++ b/src/librbd/cache/SharedPersistentObjectCacher.cc +@@ -0,0 +1,61 @@ ++// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- ++// vim: ts=8 sw=2 smarttab ++ ++#include "librbd/cache/SharedPersistentObjectCacher.h" ++#include "include/buffer.h" ++#include "common/dout.h" ++#include "librbd/ImageCtx.h" ++ ++#define dout_subsys ceph_subsys_rbd ++#undef dout_prefix ++#define dout_prefix *_dout << "librbd::cache::SharedPersistentObjectCacher: " << this \ ++ << " " << __func__ << ": " ++ ++namespace librbd { ++namespace cache { ++ ++template <typename I> ++SharedPersistentObjectCacher<I>::SharedPersistentObjectCacher(I *image_ctx, std::string cache_path) ++ : m_image_ctx(image_ctx), m_cache_path(cache_path), ++ m_file_map_lock("librbd::cache::SharedObjectCacher::filemaplock") { ++ auto *cct = m_image_ctx->cct; ++ ++} ++ ++template <typename I> ++SharedPersistentObjectCacher<I>::~SharedPersistentObjectCacher() { ++ for(auto &it: file_map) { ++ if(it.second) { ++ delete it.second; ++ } ++ } ++} ++ ++template <typename I> ++int SharedPersistentObjectCacher<I>::read_object(std::string oid, ceph::bufferlist* read_data, uint64_t offset, uint64_t length, Context *on_finish) { ++ ++ auto *cct = m_image_ctx->cct; ++ ldout(cct, 20) << "object: " << oid << dendl; ++ ++ std::string cache_file_name = m_image_ctx->data_ctx.get_pool_name() + oid; ++ ++ //TODO(): make a cache for cachefile fd ++ os::CacheStore::SyncFile* target_cache_file = new os::CacheStore::SyncFile(cct, cache_file_name); ++ target_cache_file->open(); ++ ++ int ret = target_cache_file->read_object_from_file(read_data, offset, length); ++ if (ret < 0) { ++ ldout(cct, 5) << "read from file return error: " << ret ++ << "file name= " << cache_file_name ++ << dendl; ++ } ++ ++ delete target_cache_file; ++ return ret; ++} ++ ++ ++} // namespace cache ++} // namespace librbd ++ ++template class librbd::cache::SharedPersistentObjectCacher<librbd::ImageCtx>; +diff --git a/src/librbd/cache/SharedPersistentObjectCacher.h b/src/librbd/cache/SharedPersistentObjectCacher.h +new file mode 100644 +index 0000000..d108a05 +--- /dev/null ++++ b/src/librbd/cache/SharedPersistentObjectCacher.h +@@ -0,0 +1,45 @@ ++// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- ++// vim: ts=8 sw=2 smarttab ++ ++#ifndef CEPH_LIBRBD_CACHE_SHARED_PERSISTENT_OBJECT_CACHER ++#define CEPH_LIBRBD_CACHE_SHARED_PERSISTENT_OBJECT_CACHER ++ ++#include "include/buffer_fwd.h" ++#include "include/int_types.h" ++#include "os/CacheStore/SyncFile.h" ++#include "common/Mutex.h" ++#include <vector> ++#include <map> ++ ++struct Context; ++ ++namespace librbd { ++ ++struct ImageCtx; ++ ++namespace cache { ++ ++template <typename ImageCtxT> ++class SharedPersistentObjectCacher { ++public: ++ ++ SharedPersistentObjectCacher(ImageCtxT *image_ctx, std::string cache_path); ++ ~SharedPersistentObjectCacher(); ++ ++ int read_object(std::string oid, ceph::bufferlist* read_data, ++ uint64_t offset, uint64_t length, Context *on_finish); ++ ++private: ++ ImageCtxT *m_image_ctx; ++ std::map<std::string, os::CacheStore::SyncFile*> file_map; ++ Mutex m_file_map_lock; ++ std::string m_cache_path; ++ ++}; ++ ++} // namespace cache ++} // namespace librbd ++ ++extern template class librbd::cache::SharedPersistentObjectCacher<librbd::ImageCtx>; ++ ++#endif // CEPH_LIBRBD_CACHE_FILE_IMAGE_STORE +diff --git a/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc b/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc +new file mode 100644 +index 0000000..90d886c +--- /dev/null ++++ b/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc +@@ -0,0 +1,154 @@ ++// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- ++// vim: ts=8 sw=2 smarttab ++ ++#include "librbd/cache/SharedPersistentObjectCacherObjectDispatch.h" ++#include "common/WorkQueue.h" ++#include "librbd/ImageCtx.h" ++#include "librbd/Journal.h" ++#include "librbd/Utils.h" ++#include "librbd/LibrbdWriteback.h" ++#include "librbd/io/ObjectDispatchSpec.h" ++#include "librbd/io/ObjectDispatcher.h" ++#include "librbd/io/Utils.h" ++#include "osd/osd_types.h" ++#include "osdc/WritebackHandler.h" ++#include <vector> ++ ++#define dout_subsys ceph_subsys_rbd ++#undef dout_prefix ++#define dout_prefix *_dout << "librbd::cache::SharedPersistentObjectCacherObjectDispatch: " \ ++ << this << " " << __func__ << ": " ++ ++namespace librbd { ++namespace cache { ++ ++template <typename I> ++SharedPersistentObjectCacherObjectDispatch<I>::SharedPersistentObjectCacherObjectDispatch( ++ I* image_ctx) : m_image_ctx(image_ctx) { ++} ++ ++template <typename I> ++SharedPersistentObjectCacherObjectDispatch<I>::~SharedPersistentObjectCacherObjectDispatch() { ++ if (m_object_store) { ++ delete m_object_store; ++ } ++ ++ if (m_cache_client) { ++ delete m_cache_client; ++ } ++} ++ ++template <typename I> ++void SharedPersistentObjectCacherObjectDispatch<I>::init() { ++ auto cct = m_image_ctx->cct; ++ ldout(cct, 5) << dendl; ++ ++ if (m_image_ctx->parent != nullptr) { ++ //TODO(): should we cover multi-leveled clone? ++ ldout(cct, 5) << "child image: skipping SRO cache client" << dendl; ++ return; ++ } ++ ++ ldout(cct, 20) << "parent image: setup SRO cache client = " << dendl; ++ ++ std::string controller_path = "/tmp/rbd_shared_readonly_cache_demo"; ++ m_cache_client = new CacheClient(io_service, controller_path.c_str(), ++ ([&](std::string s){client_handle_request(s);})); ++ ++ int ret = m_cache_client->connect(); ++ if (ret < 0) { ++ ldout(cct, 5) << "SRO cache client fail to connect with local controller: " ++ << "please start rbd-cache daemon" ++ << dendl; ++ } else { ++ ldout(cct, 5) << "SRO cache client to register volume on rbd-cache daemon: " ++ << "name = " << m_image_ctx->id ++ << dendl; ++ ++ ret = m_cache_client->register_volume(m_image_ctx->data_ctx.get_pool_name(), ++ m_image_ctx->id, m_image_ctx->size); ++ ++ if (ret >= 0) { ++ // add ourself to the IO object dispatcher chain ++ m_image_ctx->io_object_dispatcher->register_object_dispatch(this); ++ } ++ } ++} ++ ++template <typename I> ++bool SharedPersistentObjectCacherObjectDispatch<I>::read( ++ const std::string &oid, uint64_t object_no, uint64_t object_off, ++ uint64_t object_len, librados::snap_t snap_id, int op_flags, ++ const ZTracer::Trace &parent_trace, ceph::bufferlist* read_data, ++ io::ExtentMap* extent_map, int* object_dispatch_flags, ++ io::DispatchResult* dispatch_result, Context** on_finish, ++ Context* on_dispatched) { ++ // IO chained in reverse order ++ auto cct = m_image_ctx->cct; ++ ldout(cct, 20) << "object_no=" << object_no << " " << object_off << "~" ++ << object_len << dendl; ++ ++ // ensure we aren't holding the cache lock post-read ++ on_dispatched = util::create_async_context_callback(*m_image_ctx, ++ on_dispatched); ++ ++ if (m_cache_client && m_cache_client->connected && m_object_store) { ++ bool exists; ++ m_cache_client->lookup_object(m_image_ctx->data_ctx.get_pool_name(), ++ m_image_ctx->id, oid, &exists); ++ ++ // try to read from parent image ++ ldout(cct, 20) << "SRO cache object exists:" << exists << dendl; ++ if (exists) { ++ int r = m_object_store->read_object(oid, read_data, object_off, object_len, on_dispatched); ++ if (r != 0) { ++ *dispatch_result = io::DISPATCH_RESULT_COMPLETE; ++ on_dispatched->complete(r); ++ return true; ++ } ++ } ++ } ++ ++ ldout(cct, 20) << "Continue read from RADOS" << dendl; ++ *dispatch_result = io::DISPATCH_RESULT_CONTINUE; ++ on_dispatched->complete(0); ++ return true; ++} ++ ++template <typename I> ++void SharedPersistentObjectCacherObjectDispatch<I>::client_handle_request(std::string msg) { ++ auto cct = m_image_ctx->cct; ++ ldout(cct, 20) << dendl; ++ ++ rbdsc_req_type_t *io_ctx = (rbdsc_req_type_t*)(msg.c_str()); ++ ++ switch (io_ctx->type) { ++ case RBDSC_REGISTER_REPLY: { ++ // open cache handler for volume ++ ldout(cct, 20) << "SRO cache client open cache handler" << dendl; ++ m_object_store = new SharedPersistentObjectCacher<I>(m_image_ctx, m_image_ctx->shared_cache_path); ++ ++ break; ++ } ++ case RBDSC_READ_REPLY: { ++ ldout(cct, 20) << "SRO cache client start to read cache" << dendl; ++ //TODO(): should call read here ++ ++ break; ++ } ++ case RBDSC_READ_RADOS: { ++ ldout(cct, 20) << "SRO cache client start to read rados" << dendl; ++ //TODO(): should call read here ++ ++ break; ++ } ++ default: ldout(cct, 20) << "nothing" << dendl; ++ break; ++ ++ } ++} ++ ++} // namespace cache ++} // namespace librbd ++ ++template class librbd::cache::SharedPersistentObjectCacherObjectDispatch<librbd::ImageCtx>; +diff --git a/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.h b/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.h +new file mode 100644 +index 0000000..1ede804 +--- /dev/null ++++ b/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.h +@@ -0,0 +1,127 @@ ++// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- ++// vim: ts=8 sw=2 smarttab ++ ++#ifndef CEPH_LIBRBD_CACHE_SHARED_PERSISTENT_OBJECT_CACHER_OBJECT_DISPATCH_H ++#define CEPH_LIBRBD_CACHE_SHARED_PERSISTENT_OBJECT_CACHER_OBJECT_DISPATCH_H ++ ++#include "librbd/io/ObjectDispatchInterface.h" ++#include "common/Mutex.h" ++#include "osdc/ObjectCacher.h" ++#include "tools/rbd_cache/CacheControllerSocketClient.hpp" ++#include "SharedPersistentObjectCacher.h" ++ ++struct WritebackHandler; ++ ++namespace librbd { ++ ++class ImageCtx; ++ ++namespace cache { ++ ++/** ++ * Facade around the OSDC object cacher to make it align with ++ * the object dispatcher interface ++ */ ++template <typename ImageCtxT = ImageCtx> ++class SharedPersistentObjectCacherObjectDispatch : public io::ObjectDispatchInterface { ++public: ++ static SharedPersistentObjectCacherObjectDispatch* create(ImageCtxT* image_ctx) { ++ return new SharedPersistentObjectCacherObjectDispatch(image_ctx); ++ } ++ ++ SharedPersistentObjectCacherObjectDispatch(ImageCtxT* image_ctx); ++ ~SharedPersistentObjectCacherObjectDispatch() override; ++ ++ io::ObjectDispatchLayer get_object_dispatch_layer() const override { ++ return io::OBJECT_DISPATCH_LAYER_SHARED_PERSISTENT_CACHE; ++ } ++ ++ void init(); ++ void shut_down(Context* on_finish) { ++ m_image_ctx->op_work_queue->queue(on_finish, 0); ++ } ++ ++ bool read( ++ const std::string &oid, uint64_t object_no, uint64_t object_off, ++ uint64_t object_len, librados::snap_t snap_id, int op_flags, ++ const ZTracer::Trace &parent_trace, ceph::bufferlist* read_data, ++ io::ExtentMap* extent_map, int* object_dispatch_flags, ++ io::DispatchResult* dispatch_result, Context** on_finish, ++ Context* on_dispatched) override; ++ ++ bool discard( ++ const std::string &oid, uint64_t object_no, uint64_t object_off, ++ uint64_t object_len, const ::SnapContext &snapc, int discard_flags, ++ const ZTracer::Trace &parent_trace, int* object_dispatch_flags, ++ uint64_t* journal_tid, io::DispatchResult* dispatch_result, ++ Context** on_finish, Context* on_dispatched) { ++ return false; ++ } ++ ++ bool write( ++ const std::string &oid, uint64_t object_no, uint64_t object_off, ++ ceph::bufferlist&& data, const ::SnapContext &snapc, int op_flags, ++ const ZTracer::Trace &parent_trace, int* object_dispatch_flags, ++ uint64_t* journal_tid, io::DispatchResult* dispatch_result, ++ Context** on_finish, Context* on_dispatched) { ++ return false; ++ } ++ ++ bool write_same( ++ const std::string &oid, uint64_t object_no, uint64_t object_off, ++ uint64_t object_len, io::Extents&& buffer_extents, ++ ceph::bufferlist&& data, const ::SnapContext &snapc, int op_flags, ++ const ZTracer::Trace &parent_trace, int* object_dispatch_flags, ++ uint64_t* journal_tid, io::DispatchResult* dispatch_result, ++ Context** on_finish, Context* on_dispatched) { ++ return false; ++ } ++ ++ bool compare_and_write( ++ const std::string &oid, uint64_t object_no, uint64_t object_off, ++ ceph::bufferlist&& cmp_data, ceph::bufferlist&& write_data, ++ const ::SnapContext &snapc, int op_flags, ++ const ZTracer::Trace &parent_trace, uint64_t* mismatch_offset, ++ int* object_dispatch_flags, uint64_t* journal_tid, ++ io::DispatchResult* dispatch_result, Context** on_finish, ++ Context* on_dispatched) { ++ return false; ++ } ++ ++ bool flush( ++ io::FlushSource flush_source, const ZTracer::Trace &parent_trace, ++ io::DispatchResult* dispatch_result, Context** on_finish, ++ Context* on_dispatched) { ++ return false; ++ } ++ ++ bool invalidate_cache(Context* on_finish) { ++ return false; ++ } ++ ++ bool reset_existence_cache(Context* on_finish) { ++ return false; ++ } ++ ++ void extent_overwritten( ++ uint64_t object_no, uint64_t object_off, uint64_t object_len, ++ uint64_t journal_tid, uint64_t new_journal_tid) { ++ } ++ ++ SharedPersistentObjectCacher<ImageCtxT> *m_object_store = nullptr; ++ ++private: ++ ++ ImageCtxT* m_image_ctx; ++ ++ void client_handle_request(std::string msg); ++ CacheClient *m_cache_client = nullptr; ++ boost::asio::io_service io_service; ++}; ++ ++} // namespace cache ++} // namespace librbd ++ ++extern template class librbd::cache::SharedPersistentObjectCacherObjectDispatch<librbd::ImageCtx>; ++ ++#endif // CEPH_LIBRBD_CACHE_OBJECT_CACHER_OBJECT_DISPATCH_H +diff --git a/src/librbd/image/OpenRequest.cc b/src/librbd/image/OpenRequest.cc +index ae18739..30a7b66 100644 +--- a/src/librbd/image/OpenRequest.cc ++++ b/src/librbd/image/OpenRequest.cc +@@ -8,6 +8,7 @@ + #include "librbd/ImageCtx.h" + #include "librbd/Utils.h" + #include "librbd/cache/ObjectCacherObjectDispatch.h" ++#include "librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc" + #include "librbd/image/CloseRequest.h" + #include "librbd/image/RefreshRequest.h" + #include "librbd/image/SetSnapRequest.h" +@@ -448,12 +449,21 @@ Context *OpenRequest<I>::handle_refresh(int *result) { + + template <typename I> + Context *OpenRequest<I>::send_init_cache(int *result) { ++ ++ CephContext *cct = m_image_ctx->cct; + // cache is disabled or parent image context + if (!m_image_ctx->cache || m_image_ctx->child != nullptr) { ++ ++ // enable Shared Read-only cache for parent image ++ if (m_image_ctx->child != nullptr && m_image_ctx->shared_cache_enabled ) { ++ ldout(cct, 10) << this << " " << "setting up parent cache"<< dendl; ++ auto sro_cache = cache::SharedPersistentObjectCacherObjectDispatch<I>::create(m_image_ctx); ++ sro_cache->init(); ++ } ++ + return send_register_watch(result); + } + +- CephContext *cct = m_image_ctx->cct; + ldout(cct, 10) << this << " " << __func__ << dendl; + + auto cache = cache::ObjectCacherObjectDispatch<I>::create(m_image_ctx); +diff --git a/src/librbd/io/Types.h b/src/librbd/io/Types.h +index 7e09c90..ef3049f 100644 +--- a/src/librbd/io/Types.h ++++ b/src/librbd/io/Types.h +@@ -59,6 +59,7 @@ enum DispatchResult { + enum ObjectDispatchLayer { + OBJECT_DISPATCH_LAYER_NONE = 0, + OBJECT_DISPATCH_LAYER_CACHE, ++ OBJECT_DISPATCH_LAYER_SHARED_PERSISTENT_CACHE, + OBJECT_DISPATCH_LAYER_JOURNAL, + OBJECT_DISPATCH_LAYER_CORE, + OBJECT_DISPATCH_LAYER_LAST +diff --git a/src/os/CacheStore/SyncFile.cc b/src/os/CacheStore/SyncFile.cc +new file mode 100644 +index 0000000..5352bde +--- /dev/null ++++ b/src/os/CacheStore/SyncFile.cc +@@ -0,0 +1,110 @@ ++// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- ++// vim: ts=8 sw=2 smarttab ++ ++#include "os/CacheStore/SyncFile.h" ++#include "include/Context.h" ++#include "common/dout.h" ++#include "common/WorkQueue.h" ++#include "librbd/ImageCtx.h" ++#include <sys/types.h> ++#include <sys/stat.h> ++#include <aio.h> ++#include <errno.h> ++#include <fcntl.h> ++#include <utility> ++ ++#define dout_subsys ceph_subsys_rbd ++#undef dout_prefix ++#define dout_prefix *_dout << "librbd::file::SyncFile: " << this << " " \ ++ << __func__ << ": " ++ ++namespace os { ++namespace CacheStore { ++ ++SyncFile::SyncFile(CephContext *cct, const std::string &name) ++ : cct(cct) ++{ ++ m_name = cct->_conf->get_val<std::string>("rbd_shared_cache_path") + "/rbd_cache." + name; ++ ldout(cct, 20) << "file path=" << m_name << dendl; ++} ++ ++SyncFile::~SyncFile() ++{ ++ // TODO force proper cleanup ++ if (m_fd != -1) { ++ ::close(m_fd); ++ } ++} ++ ++void SyncFile::open(Context *on_finish) ++{ ++ while (true) { ++ m_fd = ::open(m_name.c_str(), O_CREAT | O_DIRECT | O_NOATIME | O_RDWR | O_SYNC, ++ S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP); ++ if (m_fd == -1) { ++ int r = -errno; ++ if (r == -EINTR) { ++ continue; ++ } ++ on_finish->complete(r); ++ return; ++ } ++ break; ++ } ++ ++ on_finish->complete(0); ++} ++ ++void SyncFile::open() ++{ ++ while (true) ++ { ++ m_fd = ::open(m_name.c_str(), O_CREAT | O_NOATIME | O_RDWR | O_SYNC, ++ S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP); ++ if (m_fd == -1) ++ { ++ int r = -errno; ++ if (r == -EINTR) { ++ continue; ++ } ++ return; ++ } ++ break; ++ } ++} ++ ++int SyncFile::write_object_to_file(ceph::bufferlist read_buf, uint64_t object_len) { ++ ++ ldout(cct, 20) << "cache file name:" << m_name ++ << ", length:" << object_len << dendl; ++ ++ // TODO(): aio ++ int ret = pwrite(m_fd, read_buf.c_str(), object_len, 0); ++ if(ret < 0) { ++ lderr(cct)<<"write file fail:" << std::strerror(errno) << dendl; ++ return ret; ++ } ++ ++ return ret; ++} ++ ++int SyncFile::read_object_from_file(ceph::bufferlist* read_buf, uint64_t object_off, uint64_t object_len) { ++ ++ ldout(cct, 20) << "offset:" << object_off ++ << ", length:" << object_len << dendl; ++ ++ bufferptr buf(object_len); ++ ++ // TODO(): aio ++ int ret = pread(m_fd, buf.c_str(), object_len, object_off); ++ if(ret < 0) { ++ lderr(cct)<<"read file fail:" << std::strerror(errno) << dendl; ++ return ret; ++ } ++ read_buf->append(std::move(buf)); ++ ++ return ret; ++} ++ ++} // namespace CacheStore ++} // namespace os +diff --git a/src/os/CacheStore/SyncFile.h b/src/os/CacheStore/SyncFile.h +new file mode 100644 +index 0000000..81602ce +--- /dev/null ++++ b/src/os/CacheStore/SyncFile.h +@@ -0,0 +1,74 @@ ++// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- ++// vim: ts=8 sw=2 smarttab ++ ++#ifndef CEPH_LIBOS_CACHE_STORE_SYNC_FILE ++#define CEPH_LIBOS_CACHE_STORE_SYNC_FILE ++ ++#include "include/buffer_fwd.h" ++#include <sys/mman.h> ++#include <string> ++ ++struct Context; ++struct ContextWQ; ++class CephContext; ++ ++namespace os { ++ ++namespace CacheStore { ++ ++class SyncFile { ++public: ++ SyncFile(CephContext *cct, const std::string &name); ++ ~SyncFile(); ++ ++ // TODO use IO queue instead of individual commands so operations can be ++ // submitted in batch ++ ++ // TODO use scatter/gather API ++ ++ void open(Context *on_finish); ++ ++ // ## ++ void open(); ++ bool try_open(); ++ void close(Context *on_finish); ++ void remove(Context *on_finish); ++ ++ void read(uint64_t offset, uint64_t length, ceph::bufferlist *bl, Context *on_finish); ++ ++ void write(uint64_t offset, ceph::bufferlist &&bl, bool fdatasync, Context *on_finish); ++ ++ void discard(uint64_t offset, uint64_t length, bool fdatasync, Context *on_finish); ++ ++ void truncate(uint64_t length, bool fdatasync, Context *on_finish); ++ ++ void fsync(Context *on_finish); ++ ++ void fdatasync(Context *on_finish); ++ ++ uint64_t filesize(); ++ ++ int load(void** dest, uint64_t filesize); ++ ++ int remove(); ++ ++ // ## ++ int write_object_to_file(ceph::bufferlist read_buf, uint64_t object_len); ++ int read_object_from_file(ceph::bufferlist* read_buf, uint64_t object_off, uint64_t object_len); ++ ++private: ++ CephContext *cct; ++ std::string m_name; ++ int m_fd = -1; ++ ++ int write(uint64_t offset, const ceph::bufferlist &bl, bool fdatasync); ++ int read(uint64_t offset, uint64_t length, ceph::bufferlist *bl); ++ int discard(uint64_t offset, uint64_t length, bool fdatasync); ++ int truncate(uint64_t length, bool fdatasync); ++ int fdatasync(); ++}; ++ ++} // namespace CacheStore ++} // namespace os ++ ++#endif // CEPH_LIBOS_CACHE_STORE_SYNC_FILE +diff --git a/src/test/librbd/test_mirroring.cc b/src/test/librbd/test_mirroring.cc +index b4fdeae..d7d1aa6 100644 +--- a/src/test/librbd/test_mirroring.cc ++++ b/src/test/librbd/test_mirroring.cc +@@ -47,6 +47,7 @@ public: + + void SetUp() override { + ASSERT_EQ(0, _rados.ioctx_create(_pool_name.c_str(), m_ioctx)); ++ ASSERT_EQ(0, _rados.conf_set("rbd_shared_cache_enabled", "false")); + } + + std::string image_name = "mirrorimg1"; +diff --git a/src/test/rbd_mirror/test_ImageReplayer.cc b/src/test/rbd_mirror/test_ImageReplayer.cc +index 8a95a65..b5598bd 100644 +--- a/src/test/rbd_mirror/test_ImageReplayer.cc ++++ b/src/test/rbd_mirror/test_ImageReplayer.cc +@@ -90,6 +90,7 @@ public: + EXPECT_EQ("", connect_cluster_pp(*m_local_cluster.get())); + EXPECT_EQ(0, m_local_cluster->conf_set("rbd_cache", "false")); + EXPECT_EQ(0, m_local_cluster->conf_set("rbd_mirror_journal_poll_age", "1")); ++ EXPECT_EQ(0, m_local_cluster->conf_set("rbd_shared_cache_enabled", "false")); + + m_local_pool_name = get_temp_pool_name(); + EXPECT_EQ(0, m_local_cluster->pool_create(m_local_pool_name.c_str())); +@@ -99,6 +100,7 @@ public: + + EXPECT_EQ("", connect_cluster_pp(m_remote_cluster)); + EXPECT_EQ(0, m_remote_cluster.conf_set("rbd_cache", "false")); ++ EXPECT_EQ(0, m_remote_cluster.conf_set("rbd_shared_cache_enabled", "false")); + + m_remote_pool_name = get_temp_pool_name(); + EXPECT_EQ(0, m_remote_cluster.pool_create(m_remote_pool_name.c_str())); +diff --git a/src/test/rbd_mirror/test_fixture.cc b/src/test/rbd_mirror/test_fixture.cc +index b2a51ca..9e77098 100644 +--- a/src/test/rbd_mirror/test_fixture.cc ++++ b/src/test/rbd_mirror/test_fixture.cc +@@ -27,6 +27,7 @@ void TestFixture::SetUpTestCase() { + _rados = std::shared_ptr<librados::Rados>(new librados::Rados()); + ASSERT_EQ("", connect_cluster_pp(*_rados.get())); + ASSERT_EQ(0, _rados->conf_set("rbd_cache", "false")); ++ ASSERT_EQ(0, _rados->conf_set("rbd_shared_cache_enabled", "false")); + + _local_pool_name = get_temp_pool_name("test-rbd-mirror-"); + ASSERT_EQ(0, _rados->pool_create(_local_pool_name.c_str())); +diff --git a/src/tools/CMakeLists.txt b/src/tools/CMakeLists.txt +index 3789e3c..72ab342 100644 +--- a/src/tools/CMakeLists.txt ++++ b/src/tools/CMakeLists.txt +@@ -99,6 +99,7 @@ endif(WITH_CEPHFS) + if(WITH_RBD) + add_subdirectory(rbd) + add_subdirectory(rbd_mirror) ++ add_subdirectory(rbd_cache) + if(LINUX) + add_subdirectory(rbd_nbd) + endif() +diff --git a/src/tools/rbd_cache/CMakeLists.txt b/src/tools/rbd_cache/CMakeLists.txt +new file mode 100644 +index 0000000..08eae60 +--- /dev/null ++++ b/src/tools/rbd_cache/CMakeLists.txt +@@ -0,0 +1,9 @@ ++add_executable(rbd-cache ++ ${CMAKE_SOURCE_DIR}/src/os/CacheStore/SyncFile.cc ++ ObjectCacheStore.cc ++ CacheController.cc ++ main.cc) ++target_link_libraries(rbd-cache ++ librados ++ global) ++install(TARGETS rbd-cache DESTINATION bin) +diff --git a/src/tools/rbd_cache/CacheController.cc b/src/tools/rbd_cache/CacheController.cc +new file mode 100644 +index 0000000..c914358 +--- /dev/null ++++ b/src/tools/rbd_cache/CacheController.cc +@@ -0,0 +1,105 @@ ++// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- ++// vim: ts=8 sw=2 smarttab ++ ++#include "CacheController.hpp" ++ ++#define dout_context g_ceph_context ++#define dout_subsys ceph_subsys_rbd_cache ++#undef dout_prefix ++#define dout_prefix *_dout << "rbd::cache::CacheController: " << this << " " \ ++ << __func__ << ": " ++ ++ ++class ThreadPoolSingleton : public ThreadPool { ++public: ++ ContextWQ *op_work_queue; ++ ++ explicit ThreadPoolSingleton(CephContext *cct) ++ : ThreadPool(cct, "librbd::cache::thread_pool", "tp_librbd_cache", 32, ++ "pcache_threads"), ++ op_work_queue(new ContextWQ("librbd::pcache_op_work_queue", ++ cct->_conf->get_val<int64_t>("rbd_op_thread_timeout"), ++ this)) { ++ start(); ++ } ++ ~ThreadPoolSingleton() override { ++ op_work_queue->drain(); ++ delete op_work_queue; ++ ++ stop(); ++ } ++}; ++ ++ ++CacheController::CacheController(CephContext *cct, const std::vector<const char*> &args): ++ m_args(args), m_cct(cct) { ++ ++} ++ ++CacheController::~CacheController() { ++ ++} ++ ++int CacheController::init() { ++ ThreadPoolSingleton* thread_pool_singleton = &m_cct->lookup_or_create_singleton_object<ThreadPoolSingleton>( ++ "rbd::cache::thread_pool", false, m_cct); ++ pcache_op_work_queue = thread_pool_singleton->op_work_queue; ++ ++ m_object_cache_store = new ObjectCacheStore(m_cct, pcache_op_work_queue); ++ int r = m_object_cache_store->init(false); ++ if (r < 0) { ++ //derr << "init error\n" << dendl; ++ } ++ return r; ++} ++ ++int CacheController::shutdown() { ++ int r = m_object_cache_store->shutdown(); ++ return r; ++} ++ ++void CacheController::handle_signal(int signum){} ++ ++void CacheController::run() { ++ try { ++ //TODO(): use new socket path ++ std::string controller_path = "/tmp/rbd_shared_readonly_cache_demo"; ++ std::remove(controller_path.c_str()); ++ ++ m_cache_server = new CacheServer(io_service, controller_path, ++ ([&](uint64_t p, std::string s){handle_request(p, s);})); ++ io_service.run(); ++ } catch (std::exception& e) { ++ std::cerr << "Exception: " << e.what() << "\n"; ++ } ++} ++ ++void CacheController::handle_request(uint64_t sesstion_id, std::string msg){ ++ rbdsc_req_type_t *io_ctx = (rbdsc_req_type_t*)(msg.c_str()); ++ ++ int ret = 0; ++ ++ switch (io_ctx->type) { ++ case RBDSC_REGISTER: { ++ // init cache layout for volume ++ m_object_cache_store->init_cache(io_ctx->vol_name, io_ctx->vol_size); ++ io_ctx->type = RBDSC_REGISTER_REPLY; ++ m_cache_server->send(sesstion_id, std::string((char*)io_ctx, msg.size())); ++ ++ break; ++ } ++ case RBDSC_READ: { ++ // lookup object in local cache store ++ ret = m_object_cache_store->lookup_object(io_ctx->pool_name, io_ctx->vol_name); ++ if (ret < 0) { ++ io_ctx->type = RBDSC_READ_RADOS; ++ } else { ++ io_ctx->type = RBDSC_READ_REPLY; ++ } ++ m_cache_server->send(sesstion_id, std::string((char*)io_ctx, msg.size())); ++ ++ break; ++ } ++ ++ } ++} +diff --git a/src/tools/rbd_cache/CacheController.hpp b/src/tools/rbd_cache/CacheController.hpp +new file mode 100644 +index 0000000..97113e4 +--- /dev/null ++++ b/src/tools/rbd_cache/CacheController.hpp +@@ -0,0 +1,49 @@ ++#ifndef CACHE_CONTROLLER_H ++#define CACHE_CONTROLLER_H ++ ++#include <thread> ++ ++#include "common/Formatter.h" ++#include "common/admin_socket.h" ++#include "common/debug.h" ++#include "common/errno.h" ++#include "common/ceph_context.h" ++#include "common/Mutex.h" ++#include "common/WorkQueue.h" ++#include "include/rados/librados.hpp" ++#include "include/rbd/librbd.h" ++#include "include/assert.h" ++#include "librbd/ImageCtx.h" ++#include "librbd/ImageState.h" ++ ++#include "CacheControllerSocket.hpp" ++#include "ObjectCacheStore.h" ++ ++ ++using boost::asio::local::stream_protocol; ++ ++class CacheController { ++ public: ++ CacheController(CephContext *cct, const std::vector<const char*> &args); ++ ~CacheController(); ++ ++ int init(); ++ ++ int shutdown(); ++ ++ void handle_signal(int sinnum); ++ ++ void run(); ++ ++ void handle_request(uint64_t sesstion_id, std::string msg); ++ ++ private: ++ boost::asio::io_service io_service; ++ CacheServer *m_cache_server; ++ std::vector<const char*> m_args; ++ CephContext *m_cct; ++ ObjectCacheStore *m_object_cache_store; ++ ContextWQ* pcache_op_work_queue; ++}; ++ ++#endif +diff --git a/src/tools/rbd_cache/CacheControllerSocket.hpp b/src/tools/rbd_cache/CacheControllerSocket.hpp +new file mode 100644 +index 0000000..6e1a743 +--- /dev/null ++++ b/src/tools/rbd_cache/CacheControllerSocket.hpp +@@ -0,0 +1,125 @@ ++// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- ++// vim: ts=8 sw=2 smarttab ++ ++#ifndef CACHE_CONTROLLER_SOCKET_H ++#define CACHE_CONTROLLER_SOCKET_H ++ ++#include <cstdio> ++#include <iostream> ++#include <array> ++#include <memory> ++#include <string> ++#include <boost/bind.hpp> ++#include <boost/asio.hpp> ++#include <boost/algorithm/string.hpp> ++#include "CacheControllerSocketCommon.h" ++ ++ ++using boost::asio::local::stream_protocol; ++ ++class session : public std::enable_shared_from_this<session> { ++public: ++ session(uint64_t session_id, boost::asio::io_service& io_service, ProcessMsg processmsg) ++ : session_id(session_id), socket_(io_service), process_msg(processmsg) {} ++ ++ stream_protocol::socket& socket() { ++ return socket_; ++ } ++ ++ void start() { ++ ++ boost::asio::async_read(socket_, boost::asio::buffer(data_), ++ boost::asio::transfer_exactly(544), ++ boost::bind(&session::handle_read, ++ shared_from_this(), ++ boost::asio::placeholders::error, ++ boost::asio::placeholders::bytes_transferred)); ++ ++ } ++ ++ void handle_read(const boost::system::error_code& error, size_t bytes_transferred) { ++ ++ if (!error) { ++ ++ process_msg(session_id, std::string(data_, bytes_transferred)); ++ ++ } ++ } ++ ++ void handle_write(const boost::system::error_code& error) { ++ if (!error) { ++ socket_.async_read_some(boost::asio::buffer(data_), ++ boost::bind(&session::handle_read, ++ shared_from_this(), ++ boost::asio::placeholders::error, ++ boost::asio::placeholders::bytes_transferred)); ++ } ++ } ++ ++ void send(std::string msg) { ++ ++ boost::asio::async_write(socket_, ++ boost::asio::buffer(msg.c_str(), msg.size()), ++ boost::bind(&session::handle_write, ++ shared_from_this(), ++ boost::asio::placeholders::error)); ++ ++ } ++ ++private: ++ uint64_t session_id; ++ stream_protocol::socket socket_; ++ ProcessMsg process_msg; ++ ++ // Buffer used to store data received from the client. ++ //std::array<char, 1024> data_; ++ char data_[1024]; ++}; ++ ++typedef std::shared_ptr<session> session_ptr; ++ ++class CacheServer { ++public: ++ CacheServer(boost::asio::io_service& io_service, ++ const std::string& file, ProcessMsg processmsg) ++ : io_service_(io_service), ++ server_process_msg(processmsg), ++ acceptor_(io_service, stream_protocol::endpoint(file)) ++ { ++ session_ptr new_session(new session(session_id, io_service_, server_process_msg)); ++ acceptor_.async_accept(new_session->socket(), ++ boost::bind(&CacheServer::handle_accept, this, new_session, ++ boost::asio::placeholders::error)); ++ } ++ ++ void handle_accept(session_ptr new_session, ++ const boost::system::error_code& error) ++ { ++ //TODO(): open librbd snap ++ if (!error) { ++ new_session->start(); ++ session_map.emplace(session_id, new_session); ++ session_id++; ++ new_session.reset(new session(session_id, io_service_, server_process_msg)); ++ acceptor_.async_accept(new_session->socket(), ++ boost::bind(&CacheServer::handle_accept, this, new_session, ++ boost::asio::placeholders::error)); ++ } ++ } ++ ++ void send(uint64_t session_id, std::string msg) { ++ auto it = session_map.find(session_id); ++ if (it != session_map.end()) { ++ it->second->send(msg); ++ } ++ } ++ ++private: ++ boost::asio::io_service& io_service_; ++ ProcessMsg server_process_msg; ++ stream_protocol::acceptor acceptor_; ++ uint64_t session_id = 1; ++ std::map<uint64_t, session_ptr> session_map; ++}; ++ ++#endif +diff --git a/src/tools/rbd_cache/CacheControllerSocketClient.hpp b/src/tools/rbd_cache/CacheControllerSocketClient.hpp +new file mode 100644 +index 0000000..8e61aa9 +--- /dev/null ++++ b/src/tools/rbd_cache/CacheControllerSocketClient.hpp +@@ -0,0 +1,131 @@ ++// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- ++// vim: ts=8 sw=2 smarttab ++ ++#ifndef CACHE_CONTROLLER_SOCKET_CLIENT_H ++#define CACHE_CONTROLLER_SOCKET_CLIENT_H ++ ++#include <boost/asio.hpp> ++#include <boost/bind.hpp> ++#include <boost/algorithm/string.hpp> ++#include "include/assert.h" ++#include "CacheControllerSocketCommon.h" ++ ++ ++using boost::asio::local::stream_protocol; ++ ++class CacheClient { ++public: ++ CacheClient(boost::asio::io_service& io_service, ++ const std::string& file, ClientProcessMsg processmsg) ++ : io_service_(io_service), ++ io_service_work_(io_service), ++ socket_(io_service), ++ m_client_process_msg(processmsg), ++ ep_(stream_protocol::endpoint(file)) ++ { ++ std::thread thd([this](){io_service_.run(); }); ++ thd.detach(); ++ } ++ ++ void run(){ ++ } ++ ++ int connect() { ++ try { ++ socket_.connect(ep_); ++ } catch (std::exception& e) { ++ return -1; ++ } ++ connected = true; ++ return 0; ++ } ++ ++ int register_volume(std::string pool_name, std::string vol_name, uint64_t vol_size) { ++ // cache controller will init layout ++ rbdsc_req_type_t *message = new rbdsc_req_type_t(); ++ message->type = RBDSC_REGISTER; ++ memcpy(message->pool_name, pool_name.c_str(), pool_name.size()); ++ memcpy(message->vol_name, vol_name.c_str(), vol_name.size()); ++ message->vol_size = vol_size; ++ message->offset = 0; ++ message->length = 0; ++ boost::asio::async_write(socket_, boost::asio::buffer((char*)message, message->size()), ++ [this](const boost::system::error_code& err, size_t cb) { ++ if (!err) { ++ boost::asio::async_read(socket_, boost::asio::buffer(buffer_), ++ boost::asio::transfer_exactly(544), ++ [this](const boost::system::error_code& err, size_t cb) { ++ if (!err) { ++ m_client_process_msg(std::string(buffer_, cb)); ++ } else { ++ return -1; ++ } ++ }); ++ } else { ++ return -1; ++ } ++ }); ++ ++ return 0; ++ } ++ ++ int lookup_object(std::string pool_name, std::string vol_name, std::string object_id, bool* result) { ++ rbdsc_req_type_t *message = new rbdsc_req_type_t(); ++ message->type = RBDSC_READ; ++ memcpy(message->pool_name, pool_name.c_str(), pool_name.size()); ++ memcpy(message->vol_name, object_id.c_str(), object_id.size()); ++ message->vol_size = 0; ++ message->offset = 0; ++ message->length = 0; ++ ++ boost::asio::async_write(socket_, boost::asio::buffer((char*)message, message->size()), ++ [this, result](const boost::system::error_code& err, size_t cb) { ++ if (!err) { ++ get_result(result); ++ } else { ++ return -1; ++ } ++ }); ++ std::unique_lock<std::mutex> lk(m); ++ cv.wait(lk); ++ return 0; ++ } ++ ++ void get_result(bool* result) { ++ boost::asio::async_read(socket_, boost::asio::buffer(buffer_), ++ boost::asio::transfer_exactly(544), ++ [this, result](const boost::system::error_code& err, size_t cb) { ++ if (!err) { ++ *result = true; ++ cv.notify_one(); ++ m_client_process_msg(std::string(buffer_, cb)); ++ } else { ++ return -1; ++ } ++ }); ++ } ++ ++ void handle_connect(const boost::system::error_code& error) { ++ //TODO(): open librbd snap ++ } ++ ++ void handle_write(const boost::system::error_code& error) { ++ } ++ ++private: ++ boost::asio::io_service& io_service_; ++ boost::asio::io_service::work io_service_work_; ++ stream_protocol::socket socket_; ++ ClientProcessMsg m_client_process_msg; ++ stream_protocol::endpoint ep_; ++ char buffer_[1024]; ++ int block_size_ = 1024; ++ ++ std::condition_variable cv; ++ std::mutex m; ++ ++public: ++ bool connected = false; ++}; ++ ++#endif +diff --git a/src/tools/rbd_cache/CacheControllerSocketCommon.h b/src/tools/rbd_cache/CacheControllerSocketCommon.h +new file mode 100644 +index 0000000..e253bb1 +--- /dev/null ++++ b/src/tools/rbd_cache/CacheControllerSocketCommon.h +@@ -0,0 +1,43 @@ ++// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- ++// vim: ts=8 sw=2 smarttab ++ ++#ifndef CACHE_CONTROLLER_SOCKET_COMMON_H ++#define CACHE_CONTROLLER_SOCKET_COMMON_H ++ ++#define RBDSC_REGISTER 0X11 ++#define RBDSC_READ 0X12 ++#define RBDSC_LOOKUP 0X13 ++#define RBDSC_REGISTER_REPLY 0X14 ++#define RBDSC_READ_REPLY 0X15 ++#define RBDSC_LOOKUP_REPLY 0X16 ++#define RBDSC_READ_RADOS 0X16 ++ ++typedef std::function<void(uint64_t, std::string)> ProcessMsg; ++typedef std::function<void(std::string)> ClientProcessMsg; ++typedef uint8_t rbdsc_req_type; ++struct rbdsc_req_type_t { ++ rbdsc_req_type type; ++ uint64_t vol_size; ++ uint64_t offset; ++ uint64_t length; ++ char pool_name[256]; ++ char vol_name[256]; ++ ++ uint64_t size() { ++ return sizeof(rbdsc_req_type_t); ++ } ++ ++ std::string to_buffer() { ++ std::stringstream ss; ++ ss << type; ++ ss << vol_size; ++ ss << offset; ++ ss << length; ++ ss << pool_name; ++ ss << vol_name; ++ ++ return ss.str(); ++ } ++}; ++ ++#endif +diff --git a/src/tools/rbd_cache/ObjectCacheStore.cc b/src/tools/rbd_cache/ObjectCacheStore.cc +new file mode 100644 +index 0000000..90b407c +--- /dev/null ++++ b/src/tools/rbd_cache/ObjectCacheStore.cc +@@ -0,0 +1,147 @@ ++// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- ++// vim: ts=8 sw=2 smarttab ++ ++#include "ObjectCacheStore.h" ++ ++#define dout_context g_ceph_context ++#define dout_subsys ceph_subsys_rbd_cache ++#undef dout_prefix ++#define dout_prefix *_dout << "rbd::cache::ObjectCacheStore: " << this << " " \ ++ << __func__ << ": " ++ ++ ++ObjectCacheStore::ObjectCacheStore(CephContext *cct, ContextWQ* work_queue) ++ : m_cct(cct), m_work_queue(work_queue), ++ m_cache_table_lock("rbd::cache::ObjectCacheStore"), ++ m_rados(new librados::Rados()) { ++} ++ ++ObjectCacheStore::~ObjectCacheStore() { ++ ++} ++ ++int ObjectCacheStore::init(bool reset) { ++ ++ int ret = m_rados->init_with_context(m_cct); ++ if(ret < 0) { ++ lderr(m_cct) << "fail to init Ceph context" << dendl; ++ return ret; ++ } ++ ++ ret = m_rados->connect(); ++ if(ret < 0 ) { ++ lderr(m_cct) << "fail to conect to cluster" << dendl; ++ return ret; ++ } ++ //TODO(): check existing cache objects ++ return ret; ++} ++ ++int ObjectCacheStore::do_promote(std::string pool_name, std::string object_name) { ++ int ret = 0; ++ std::string cache_file_name = pool_name + object_name; ++ ++ if (m_ioctxs.find(pool_name) == m_ioctxs.end()) { ++ librados::IoCtx* io_ctx = new librados::IoCtx(); ++ ret = m_rados->ioctx_create(pool_name.c_str(), *io_ctx); ++ if (ret < 0) { ++ lderr(m_cct) << "fail to create ioctx" << dendl; ++ assert(0); ++ } ++ m_ioctxs.emplace(pool_name, io_ctx); ++ } ++ ++ assert(m_ioctxs.find(pool_name) != m_ioctxs.end()); ++ ++ librados::IoCtx* ioctx = m_ioctxs[pool_name]; ++ ++ //promoting: update metadata ++ { ++ Mutex::Locker locker(m_cache_table_lock); ++ m_cache_table.emplace(cache_file_name, PROMOTING); ++ } ++ ++ librados::bufferlist read_buf; ++ int object_size = 4096*1024; //TODO(): read config from image metadata ++ ++ //TODO(): async promote ++ ret = promote_object(ioctx, object_name, read_buf, object_size); ++ if (ret == -ENOENT) { ++ read_buf.append(std::string(object_size, '0')); ++ ret = 0; ++ } ++ ++ if( ret < 0) { ++ lderr(m_cct) << "fail to read from rados" << dendl; ++ return ret; ++ } ++ ++ // persistent to cache ++ os::CacheStore::SyncFile cache_file(m_cct, cache_file_name); ++ cache_file.open(); ++ ret = cache_file.write_object_to_file(read_buf, object_size); ++ ++ assert(m_cache_table.find(cache_file_name) != m_cache_table.end()); ++ ++ // update metadata ++ { ++ Mutex::Locker locker(m_cache_table_lock); ++ m_cache_table.emplace(cache_file_name, PROMOTED); ++ } ++ ++ return ret; ++ ++} ++ ++int ObjectCacheStore::lookup_object(std::string pool_name, std::string object_name) { ++ ++ std::string cache_file_name = pool_name + object_name; ++ { ++ Mutex::Locker locker(m_cache_table_lock); ++ ++ auto it = m_cache_table.find(cache_file_name); ++ if (it != m_cache_table.end()) { ++ ++ if (it->second == PROMOTING) { ++ return -1; ++ } else if (it->second == PROMOTED) { ++ return 0; ++ } else { ++ assert(0); ++ } ++ } ++ } ++ ++ int ret = do_promote(pool_name, object_name); ++ ++ return ret; ++} ++ ++int ObjectCacheStore::shutdown() { ++ m_rados->shutdown(); ++ return 0; ++} ++ ++int ObjectCacheStore::init_cache(std::string vol_name, uint64_t vol_size) { ++ return 0; ++} ++ ++int ObjectCacheStore::lock_cache(std::string vol_name) { ++ return 0; ++} ++ ++int ObjectCacheStore::promote_object(librados::IoCtx* ioctx, std::string object_name, librados::bufferlist read_buf, uint64_t read_len) { ++ int ret; ++ ++ librados::AioCompletion* read_completion = librados::Rados::aio_create_completion(); ++ ++ ret = ioctx->aio_read(object_name, read_completion, &read_buf, read_len, 0); ++ if(ret < 0) { ++ lderr(m_cct) << "fail to read from rados" << dendl; ++ return ret; ++ } ++ read_completion->wait_for_complete(); ++ ret = read_completion->get_return_value(); ++ return ret; ++ ++} +diff --git a/src/tools/rbd_cache/ObjectCacheStore.h b/src/tools/rbd_cache/ObjectCacheStore.h +new file mode 100644 +index 0000000..12f8399 +--- /dev/null ++++ b/src/tools/rbd_cache/ObjectCacheStore.h +@@ -0,0 +1,65 @@ ++// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- ++// vim: ts=8 sw=2 smarttab ++ ++#ifndef OBJECT_CACHE_STORE_H ++#define OBJECT_CACHE_STORE_H ++ ++#include "common/debug.h" ++#include "common/errno.h" ++#include "common/ceph_context.h" ++#include "common/Mutex.h" ++#include "include/rados/librados.hpp" ++#include "include/rbd/librbd.h" ++#include "librbd/ImageCtx.h" ++#include "librbd/ImageState.h" ++#include "os/CacheStore/SyncFile.h" ++ ++using librados::Rados; ++using librados::IoCtx; ++ ++typedef shared_ptr<librados::Rados> RadosRef; ++typedef shared_ptr<librados::IoCtx> IoCtxRef; ++ ++class ObjectCacheStore ++{ ++ public: ++ ObjectCacheStore(CephContext *cct, ContextWQ* work_queue); ++ ~ObjectCacheStore(); ++ ++ int init(bool reset); ++ ++ int shutdown(); ++ ++ int lookup_object(std::string pool_name, std::string object_name); ++ ++ int init_cache(std::string vol_name, uint64_t vol_size); ++ ++ int lock_cache(std::string vol_name); ++ ++ private: ++ int _evict_object(); ++ ++ int do_promote(std::string pool_name, std::string object_name); ++ ++ int promote_object(librados::IoCtx*, std::string object_name, ++ librados::bufferlist read_buf, ++ uint64_t length); ++ ++ enum { ++ PROMOTING = 0, ++ PROMOTED, ++ }; ++ ++ CephContext *m_cct; ++ ContextWQ* m_work_queue; ++ Mutex m_cache_table_lock; ++ RadosRef m_rados; ++ ++ std::map<std::string, uint8_t> m_cache_table; ++ ++ std::map<std::string, librados::IoCtx*> m_ioctxs; ++ ++ os::CacheStore::SyncFile *m_cache_file; ++}; ++ ++#endif +diff --git a/src/tools/rbd_cache/main.cc b/src/tools/rbd_cache/main.cc +new file mode 100644 +index 0000000..336a581 +--- /dev/null ++++ b/src/tools/rbd_cache/main.cc +@@ -0,0 +1,85 @@ ++// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- ++// vim: ts=8 sw=2 smarttab ++ ++#include "common/ceph_argparse.h" ++#include "common/config.h" ++#include "common/debug.h" ++#include "common/errno.h" ++#include "global/global_init.h" ++#include "global/signal_handler.h" ++#include "CacheController.hpp" ++ ++#include <vector> ++ ++CacheController *cachectl = nullptr; ++ ++void usage() { ++ std::cout << "usage: cache controller [options...]" << std::endl; ++ std::cout << "options:\n"; ++ std::cout << " -m monaddress[:port] connect to specified monitor\n"; ++ std::cout << " --keyring=<path> path to keyring for local cluster\n"; ++ std::cout << " --log-file=<logfile> file to log debug output\n"; ++ std::cout << " --debug-rbd-cachecontroller=<log-level>/<memory-level> set rbd-mirror debug level\n"; ++ generic_server_usage(); ++} ++ ++static void handle_signal(int signum) ++{ ++ if (cachectl) ++ cachectl->handle_signal(signum); ++} ++ ++int main(int argc, const char **argv) ++{ ++ std::vector<const char*> args; ++ env_to_vec(args); ++ argv_to_vec(argc, argv, args); ++ ++ auto cct = global_init(nullptr, args, CEPH_ENTITY_TYPE_CLIENT, ++ CODE_ENVIRONMENT_DAEMON, ++ CINIT_FLAG_UNPRIVILEGED_DAEMON_DEFAULTS); ++ ++ for (auto i = args.begin(); i != args.end(); ++i) { ++ if (ceph_argparse_flag(args, i, "-h", "--help", (char*)NULL)) { ++ usage(); ++ return EXIT_SUCCESS; ++ } ++ } ++ ++ if (g_conf->daemonize) { ++ global_init_daemonize(g_ceph_context); ++ } ++ g_ceph_context->enable_perf_counter(); ++ ++ common_init_finish(g_ceph_context); ++ ++ init_async_signal_handler(); ++ register_async_signal_handler(SIGHUP, sighup_handler); ++ register_async_signal_handler_oneshot(SIGINT, handle_signal); ++ register_async_signal_handler_oneshot(SIGTERM, handle_signal); ++ ++ std::vector<const char*> cmd_args; ++ argv_to_vec(argc, argv, cmd_args); ++ ++ // disable unnecessary librbd cache ++ g_ceph_context->_conf->set_val_or_die("rbd_cache", "false"); ++ ++ cachectl = new CacheController(g_ceph_context, cmd_args); ++ int r = cachectl->init(); ++ if (r < 0) { ++ std::cerr << "failed to initialize: " << cpp_strerror(r) << std::endl; ++ goto cleanup; ++ } ++ ++ cachectl->run(); ++ ++ cleanup: ++ unregister_async_signal_handler(SIGHUP, sighup_handler); ++ unregister_async_signal_handler(SIGINT, handle_signal); ++ unregister_async_signal_handler(SIGTERM, handle_signal); ++ shutdown_async_signal_handler(); ++ ++ delete cachectl; ++ ++ return r < 0 ? EXIT_SUCCESS : EXIT_FAILURE; ++} +-- +2.7.4 + diff --git a/src/ceph/0002-librbd-cleanup-rbd-shared-RO-cache.patch b/src/ceph/0002-librbd-cleanup-rbd-shared-RO-cache.patch new file mode 100644 index 0000000..7fb78e3 --- /dev/null +++ b/src/ceph/0002-librbd-cleanup-rbd-shared-RO-cache.patch @@ -0,0 +1,847 @@ +From 26f4a0804c035895fd77e9a70f47ede3f4512bde Mon Sep 17 00:00:00 2001 +From: Yuan Zhou <yuan.zhou@intel.com> +Date: Wed, 20 Jun 2018 11:34:17 +0800 +Subject: [PATCH 02/10] librbd: cleanup rbd shared RO cache + +- adding namespace for rbd cache controller +- move SyncFile code under librbd/cache/ + +Signed-off-by: Yuan Zhou <yuan.zhou@intel.com> +--- + src/librbd/CMakeLists.txt | 2 +- + src/librbd/cache/SharedPersistentObjectCacher.cc | 4 +- + src/librbd/cache/SharedPersistentObjectCacher.h | 6 +- + .../cache/SharedPersistentObjectCacherFile.cc | 114 +++++++++++++++++++++ + .../cache/SharedPersistentObjectCacherFile.h | 74 +++++++++++++ + .../SharedPersistentObjectCacherObjectDispatch.cc | 4 +- + .../SharedPersistentObjectCacherObjectDispatch.h | 2 +- + src/os/CacheStore/SyncFile.cc | 110 -------------------- + src/os/CacheStore/SyncFile.h | 74 ------------- + src/tools/rbd_cache/CMakeLists.txt | 2 +- + src/tools/rbd_cache/CacheController.cc | 9 +- + src/tools/rbd_cache/CacheController.h | 55 ++++++++++ + src/tools/rbd_cache/CacheController.hpp | 49 --------- + src/tools/rbd_cache/CacheControllerSocket.hpp | 6 ++ + .../rbd_cache/CacheControllerSocketClient.hpp | 5 + + src/tools/rbd_cache/CacheControllerSocketCommon.h | 5 + + src/tools/rbd_cache/ObjectCacheStore.cc | 7 +- + src/tools/rbd_cache/ObjectCacheStore.h | 9 +- + src/tools/rbd_cache/main.cc | 6 +- + 19 files changed, 293 insertions(+), 250 deletions(-) + create mode 100644 src/librbd/cache/SharedPersistentObjectCacherFile.cc + create mode 100644 src/librbd/cache/SharedPersistentObjectCacherFile.h + delete mode 100644 src/os/CacheStore/SyncFile.cc + delete mode 100644 src/os/CacheStore/SyncFile.h + create mode 100644 src/tools/rbd_cache/CacheController.h + delete mode 100644 src/tools/rbd_cache/CacheController.hpp + +diff --git a/src/librbd/CMakeLists.txt b/src/librbd/CMakeLists.txt +index 92539a8..540ee78 100644 +--- a/src/librbd/CMakeLists.txt ++++ b/src/librbd/CMakeLists.txt +@@ -34,6 +34,7 @@ set(librbd_internal_srcs + cache/ObjectCacherObjectDispatch.cc + cache/SharedPersistentObjectCacherObjectDispatch.cc + cache/SharedPersistentObjectCacher.cc ++ cache/SharedPersistentObjectCacherFile.cc + deep_copy/ImageCopyRequest.cc + deep_copy/MetadataCopyRequest.cc + deep_copy/ObjectCopyRequest.cc +@@ -124,7 +125,6 @@ set(librbd_internal_srcs + trash/MoveRequest.cc + watcher/Notifier.cc + watcher/RewatchRequest.cc +- ${CMAKE_SOURCE_DIR}/src/os/CacheStore/SyncFile.cc + ${CMAKE_SOURCE_DIR}/src/common/ContextCompletion.cc) + + add_library(rbd_api STATIC librbd.cc) +diff --git a/src/librbd/cache/SharedPersistentObjectCacher.cc b/src/librbd/cache/SharedPersistentObjectCacher.cc +index a849260..260567c 100644 +--- a/src/librbd/cache/SharedPersistentObjectCacher.cc ++++ b/src/librbd/cache/SharedPersistentObjectCacher.cc +@@ -19,7 +19,7 @@ SharedPersistentObjectCacher<I>::SharedPersistentObjectCacher(I *image_ctx, std: + : m_image_ctx(image_ctx), m_cache_path(cache_path), + m_file_map_lock("librbd::cache::SharedObjectCacher::filemaplock") { + auto *cct = m_image_ctx->cct; +- ++ ldout(cct, 20) << dendl; + } + + template <typename I> +@@ -40,7 +40,7 @@ int SharedPersistentObjectCacher<I>::read_object(std::string oid, ceph::bufferli + std::string cache_file_name = m_image_ctx->data_ctx.get_pool_name() + oid; + + //TODO(): make a cache for cachefile fd +- os::CacheStore::SyncFile* target_cache_file = new os::CacheStore::SyncFile(cct, cache_file_name); ++ SyncFile* target_cache_file = new SyncFile(cct, cache_file_name); + target_cache_file->open(); + + int ret = target_cache_file->read_object_from_file(read_data, offset, length); +diff --git a/src/librbd/cache/SharedPersistentObjectCacher.h b/src/librbd/cache/SharedPersistentObjectCacher.h +index d108a05..af04e63 100644 +--- a/src/librbd/cache/SharedPersistentObjectCacher.h ++++ b/src/librbd/cache/SharedPersistentObjectCacher.h +@@ -6,7 +6,7 @@ + + #include "include/buffer_fwd.h" + #include "include/int_types.h" +-#include "os/CacheStore/SyncFile.h" ++#include "SharedPersistentObjectCacherFile.h" + #include "common/Mutex.h" + #include <vector> + #include <map> +@@ -31,9 +31,9 @@ public: + + private: + ImageCtxT *m_image_ctx; +- std::map<std::string, os::CacheStore::SyncFile*> file_map; +- Mutex m_file_map_lock; + std::string m_cache_path; ++ Mutex m_file_map_lock; ++ std::map<std::string, SyncFile*> file_map; + + }; + +diff --git a/src/librbd/cache/SharedPersistentObjectCacherFile.cc b/src/librbd/cache/SharedPersistentObjectCacherFile.cc +new file mode 100644 +index 0000000..75a3053 +--- /dev/null ++++ b/src/librbd/cache/SharedPersistentObjectCacherFile.cc +@@ -0,0 +1,114 @@ ++// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- ++// vim: ts=8 sw=2 smarttab ++ ++#include "SharedPersistentObjectCacherFile.h" ++#include "include/Context.h" ++#include "common/dout.h" ++#include "common/WorkQueue.h" ++#include "librbd/ImageCtx.h" ++#include <sys/types.h> ++#include <sys/stat.h> ++#include <aio.h> ++#include <errno.h> ++#include <fcntl.h> ++#include <utility> ++ ++#define dout_subsys ceph_subsys_rbd ++#undef dout_prefix ++#define dout_prefix *_dout << "librbd::cache::SyncFile: " << this << " " \ ++ << __func__ << ": " ++ ++namespace librbd { ++namespace cache { ++ ++SyncFile::SyncFile(CephContext *cct, const std::string &name) ++ : cct(cct) { ++ m_name = cct->_conf->get_val<std::string>("rbd_shared_cache_path") + "/rbd_cache." + name; ++ ldout(cct, 20) << "file path=" << m_name << dendl; ++} ++ ++SyncFile::~SyncFile() { ++ // TODO force proper cleanup ++ if (m_fd != -1) { ++ ::close(m_fd); ++ } ++} ++ ++void SyncFile::open(Context *on_finish) { ++ while (true) { ++ m_fd = ::open(m_name.c_str(), O_CREAT | O_DIRECT | O_NOATIME | O_RDWR | O_SYNC, ++ S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP); ++ if (m_fd == -1) { ++ int r = -errno; ++ if (r == -EINTR) { ++ continue; ++ } ++ on_finish->complete(r); ++ return; ++ } ++ break; ++ } ++ ++ on_finish->complete(0); ++} ++ ++void SyncFile::open() { ++ while (true) ++ { ++ m_fd = ::open(m_name.c_str(), O_CREAT | O_NOATIME | O_RDWR | O_SYNC, ++ S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP); ++ if (m_fd == -1) ++ { ++ int r = -errno; ++ if (r == -EINTR) { ++ continue; ++ } ++ return; ++ } ++ break; ++ } ++} ++ ++void SyncFile::read(uint64_t offset, uint64_t length, ceph::bufferlist *bl, Context *on_finish) { ++ on_finish->complete(read_object_from_file(bl, offset, length)); ++} ++ ++void SyncFile::write(uint64_t offset, ceph::bufferlist &&bl, bool fdatasync, Context *on_finish) { ++ on_finish->complete(write_object_to_file(bl, bl.length())); ++} ++ ++int SyncFile::write_object_to_file(ceph::bufferlist read_buf, uint64_t object_len) { ++ ++ ldout(cct, 20) << "cache file name:" << m_name ++ << ", length:" << object_len << dendl; ++ ++ // TODO(): aio ++ int ret = pwrite(m_fd, read_buf.c_str(), object_len, 0); ++ if(ret < 0) { ++ lderr(cct)<<"write file fail:" << std::strerror(errno) << dendl; ++ return ret; ++ } ++ ++ return ret; ++} ++ ++int SyncFile::read_object_from_file(ceph::bufferlist* read_buf, uint64_t object_off, uint64_t object_len) { ++ ++ ldout(cct, 20) << "offset:" << object_off ++ << ", length:" << object_len << dendl; ++ ++ bufferptr buf(object_len); ++ ++ // TODO(): aio ++ int ret = pread(m_fd, buf.c_str(), object_len, object_off); ++ if(ret < 0) { ++ lderr(cct)<<"read file fail:" << std::strerror(errno) << dendl; ++ return ret; ++ } ++ read_buf->append(std::move(buf)); ++ ++ return ret; ++} ++ ++} // namespace cache ++} // namespace librbd +diff --git a/src/librbd/cache/SharedPersistentObjectCacherFile.h b/src/librbd/cache/SharedPersistentObjectCacherFile.h +new file mode 100644 +index 0000000..ccbe730 +--- /dev/null ++++ b/src/librbd/cache/SharedPersistentObjectCacherFile.h +@@ -0,0 +1,74 @@ ++// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- ++// vim: ts=8 sw=2 smarttab ++ ++#ifndef CEPH_LIBRBD_CACHE_STORE_SYNC_FILE ++#define CEPH_LIBRBD_CACHE_STORE_SYNC_FILE ++ ++#include "include/buffer_fwd.h" ++#include <sys/mman.h> ++#include <string> ++ ++struct Context; ++struct ContextWQ; ++class CephContext; ++ ++namespace librbd { ++ ++namespace cache { ++ ++class SyncFile { ++public: ++ SyncFile(CephContext *cct, const std::string &name); ++ ~SyncFile(); ++ ++ // TODO use IO queue instead of individual commands so operations can be ++ // submitted in batch ++ ++ // TODO use scatter/gather API ++ ++ void open(Context *on_finish); ++ ++ // ## ++ void open(); ++ bool try_open(); ++ void close(Context *on_finish); ++ void remove(Context *on_finish); ++ ++ void read(uint64_t offset, uint64_t length, ceph::bufferlist *bl, Context *on_finish); ++ ++ void write(uint64_t offset, ceph::bufferlist &&bl, bool fdatasync, Context *on_finish); ++ ++ void discard(uint64_t offset, uint64_t length, bool fdatasync, Context *on_finish); ++ ++ void truncate(uint64_t length, bool fdatasync, Context *on_finish); ++ ++ void fsync(Context *on_finish); ++ ++ void fdatasync(Context *on_finish); ++ ++ uint64_t filesize(); ++ ++ int load(void** dest, uint64_t filesize); ++ ++ int remove(); ++ ++ // ## ++ int write_object_to_file(ceph::bufferlist read_buf, uint64_t object_len); ++ int read_object_from_file(ceph::bufferlist* read_buf, uint64_t object_off, uint64_t object_len); ++ ++private: ++ CephContext *cct; ++ std::string m_name; ++ int m_fd = -1; ++ ++ int write(uint64_t offset, const ceph::bufferlist &bl, bool fdatasync); ++ int read(uint64_t offset, uint64_t length, ceph::bufferlist *bl); ++ int discard(uint64_t offset, uint64_t length, bool fdatasync); ++ int truncate(uint64_t length, bool fdatasync); ++ int fdatasync(); ++}; ++ ++} // namespace cache ++} // namespace librbd ++ ++#endif // CEPH_LIBRBD_CACHE_STORE_SYNC_FILE +diff --git a/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc b/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc +index 90d886c..2aa5cad 100644 +--- a/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc ++++ b/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc +@@ -52,7 +52,7 @@ void SharedPersistentObjectCacherObjectDispatch<I>::init() { + ldout(cct, 20) << "parent image: setup SRO cache client = " << dendl; + + std::string controller_path = "/tmp/rbd_shared_readonly_cache_demo"; +- m_cache_client = new CacheClient(io_service, controller_path.c_str(), ++ m_cache_client = new rbd::cache::CacheClient(io_service, controller_path.c_str(), + ([&](std::string s){client_handle_request(s);})); + + int ret = m_cache_client->connect(); +@@ -120,7 +120,7 @@ void SharedPersistentObjectCacherObjectDispatch<I>::client_handle_request(std::s + auto cct = m_image_ctx->cct; + ldout(cct, 20) << dendl; + +- rbdsc_req_type_t *io_ctx = (rbdsc_req_type_t*)(msg.c_str()); ++ rbd::cache::rbdsc_req_type_t *io_ctx = (rbd::cache::rbdsc_req_type_t*)(msg.c_str()); + + switch (io_ctx->type) { + case RBDSC_REGISTER_REPLY: { +diff --git a/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.h b/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.h +index 1ede804..200688f 100644 +--- a/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.h ++++ b/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.h +@@ -115,7 +115,7 @@ private: + ImageCtxT* m_image_ctx; + + void client_handle_request(std::string msg); +- CacheClient *m_cache_client = nullptr; ++ rbd::cache::CacheClient *m_cache_client = nullptr; + boost::asio::io_service io_service; + }; + +diff --git a/src/os/CacheStore/SyncFile.cc b/src/os/CacheStore/SyncFile.cc +deleted file mode 100644 +index 5352bde..0000000 +--- a/src/os/CacheStore/SyncFile.cc ++++ /dev/null +@@ -1,110 +0,0 @@ +-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +-// vim: ts=8 sw=2 smarttab +- +-#include "os/CacheStore/SyncFile.h" +-#include "include/Context.h" +-#include "common/dout.h" +-#include "common/WorkQueue.h" +-#include "librbd/ImageCtx.h" +-#include <sys/types.h> +-#include <sys/stat.h> +-#include <aio.h> +-#include <errno.h> +-#include <fcntl.h> +-#include <utility> +- +-#define dout_subsys ceph_subsys_rbd +-#undef dout_prefix +-#define dout_prefix *_dout << "librbd::file::SyncFile: " << this << " " \ +- << __func__ << ": " +- +-namespace os { +-namespace CacheStore { +- +-SyncFile::SyncFile(CephContext *cct, const std::string &name) +- : cct(cct) +-{ +- m_name = cct->_conf->get_val<std::string>("rbd_shared_cache_path") + "/rbd_cache." + name; +- ldout(cct, 20) << "file path=" << m_name << dendl; +-} +- +-SyncFile::~SyncFile() +-{ +- // TODO force proper cleanup +- if (m_fd != -1) { +- ::close(m_fd); +- } +-} +- +-void SyncFile::open(Context *on_finish) +-{ +- while (true) { +- m_fd = ::open(m_name.c_str(), O_CREAT | O_DIRECT | O_NOATIME | O_RDWR | O_SYNC, +- S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP); +- if (m_fd == -1) { +- int r = -errno; +- if (r == -EINTR) { +- continue; +- } +- on_finish->complete(r); +- return; +- } +- break; +- } +- +- on_finish->complete(0); +-} +- +-void SyncFile::open() +-{ +- while (true) +- { +- m_fd = ::open(m_name.c_str(), O_CREAT | O_NOATIME | O_RDWR | O_SYNC, +- S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP); +- if (m_fd == -1) +- { +- int r = -errno; +- if (r == -EINTR) { +- continue; +- } +- return; +- } +- break; +- } +-} +- +-int SyncFile::write_object_to_file(ceph::bufferlist read_buf, uint64_t object_len) { +- +- ldout(cct, 20) << "cache file name:" << m_name +- << ", length:" << object_len << dendl; +- +- // TODO(): aio +- int ret = pwrite(m_fd, read_buf.c_str(), object_len, 0); +- if(ret < 0) { +- lderr(cct)<<"write file fail:" << std::strerror(errno) << dendl; +- return ret; +- } +- +- return ret; +-} +- +-int SyncFile::read_object_from_file(ceph::bufferlist* read_buf, uint64_t object_off, uint64_t object_len) { +- +- ldout(cct, 20) << "offset:" << object_off +- << ", length:" << object_len << dendl; +- +- bufferptr buf(object_len); +- +- // TODO(): aio +- int ret = pread(m_fd, buf.c_str(), object_len, object_off); +- if(ret < 0) { +- lderr(cct)<<"read file fail:" << std::strerror(errno) << dendl; +- return ret; +- } +- read_buf->append(std::move(buf)); +- +- return ret; +-} +- +-} // namespace CacheStore +-} // namespace os +diff --git a/src/os/CacheStore/SyncFile.h b/src/os/CacheStore/SyncFile.h +deleted file mode 100644 +index 81602ce..0000000 +--- a/src/os/CacheStore/SyncFile.h ++++ /dev/null +@@ -1,74 +0,0 @@ +-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +-// vim: ts=8 sw=2 smarttab +- +-#ifndef CEPH_LIBOS_CACHE_STORE_SYNC_FILE +-#define CEPH_LIBOS_CACHE_STORE_SYNC_FILE +- +-#include "include/buffer_fwd.h" +-#include <sys/mman.h> +-#include <string> +- +-struct Context; +-struct ContextWQ; +-class CephContext; +- +-namespace os { +- +-namespace CacheStore { +- +-class SyncFile { +-public: +- SyncFile(CephContext *cct, const std::string &name); +- ~SyncFile(); +- +- // TODO use IO queue instead of individual commands so operations can be +- // submitted in batch +- +- // TODO use scatter/gather API +- +- void open(Context *on_finish); +- +- // ## +- void open(); +- bool try_open(); +- void close(Context *on_finish); +- void remove(Context *on_finish); +- +- void read(uint64_t offset, uint64_t length, ceph::bufferlist *bl, Context *on_finish); +- +- void write(uint64_t offset, ceph::bufferlist &&bl, bool fdatasync, Context *on_finish); +- +- void discard(uint64_t offset, uint64_t length, bool fdatasync, Context *on_finish); +- +- void truncate(uint64_t length, bool fdatasync, Context *on_finish); +- +- void fsync(Context *on_finish); +- +- void fdatasync(Context *on_finish); +- +- uint64_t filesize(); +- +- int load(void** dest, uint64_t filesize); +- +- int remove(); +- +- // ## +- int write_object_to_file(ceph::bufferlist read_buf, uint64_t object_len); +- int read_object_from_file(ceph::bufferlist* read_buf, uint64_t object_off, uint64_t object_len); +- +-private: +- CephContext *cct; +- std::string m_name; +- int m_fd = -1; +- +- int write(uint64_t offset, const ceph::bufferlist &bl, bool fdatasync); +- int read(uint64_t offset, uint64_t length, ceph::bufferlist *bl); +- int discard(uint64_t offset, uint64_t length, bool fdatasync); +- int truncate(uint64_t length, bool fdatasync); +- int fdatasync(); +-}; +- +-} // namespace CacheStore +-} // namespace os +- +-#endif // CEPH_LIBOS_CACHE_STORE_SYNC_FILE +diff --git a/src/tools/rbd_cache/CMakeLists.txt b/src/tools/rbd_cache/CMakeLists.txt +index 08eae60..597d802 100644 +--- a/src/tools/rbd_cache/CMakeLists.txt ++++ b/src/tools/rbd_cache/CMakeLists.txt +@@ -1,5 +1,5 @@ + add_executable(rbd-cache +- ${CMAKE_SOURCE_DIR}/src/os/CacheStore/SyncFile.cc ++ ${CMAKE_SOURCE_DIR}/src/librbd/cache/SharedPersistentObjectCacherFile.cc + ObjectCacheStore.cc + CacheController.cc + main.cc) +diff --git a/src/tools/rbd_cache/CacheController.cc b/src/tools/rbd_cache/CacheController.cc +index c914358..e73ba25 100644 +--- a/src/tools/rbd_cache/CacheController.cc ++++ b/src/tools/rbd_cache/CacheController.cc +@@ -1,7 +1,7 @@ + // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- + // vim: ts=8 sw=2 smarttab + +-#include "CacheController.hpp" ++#include "CacheController.h" + + #define dout_context g_ceph_context + #define dout_subsys ceph_subsys_rbd_cache +@@ -9,6 +9,8 @@ + #define dout_prefix *_dout << "rbd::cache::CacheController: " << this << " " \ + << __func__ << ": " + ++namespace rbd { ++namespace cache { + + class ThreadPoolSingleton : public ThreadPool { + public: +@@ -103,3 +105,8 @@ void CacheController::handle_request(uint64_t sesstion_id, std::string msg){ + + } + } ++ ++} // namespace rbd ++} // namespace cache ++ ++ +diff --git a/src/tools/rbd_cache/CacheController.h b/src/tools/rbd_cache/CacheController.h +new file mode 100644 +index 0000000..0e3abc1 +--- /dev/null ++++ b/src/tools/rbd_cache/CacheController.h +@@ -0,0 +1,55 @@ ++#ifndef CACHE_CONTROLLER_H ++#define CACHE_CONTROLLER_H ++ ++#include <thread> ++ ++#include "common/Formatter.h" ++#include "common/admin_socket.h" ++#include "common/debug.h" ++#include "common/errno.h" ++#include "common/ceph_context.h" ++#include "common/Mutex.h" ++#include "common/WorkQueue.h" ++#include "include/rados/librados.hpp" ++#include "include/rbd/librbd.h" ++#include "include/assert.h" ++#include "librbd/ImageCtx.h" ++#include "librbd/ImageState.h" ++ ++#include "CacheControllerSocket.hpp" ++#include "ObjectCacheStore.h" ++ ++ ++using boost::asio::local::stream_protocol; ++ ++namespace rbd { ++namespace cache { ++ ++class CacheController { ++ public: ++ CacheController(CephContext *cct, const std::vector<const char*> &args); ++ ~CacheController(); ++ ++ int init(); ++ ++ int shutdown(); ++ ++ void handle_signal(int sinnum); ++ ++ void run(); ++ ++ void handle_request(uint64_t sesstion_id, std::string msg); ++ ++ private: ++ boost::asio::io_service io_service; ++ CacheServer *m_cache_server; ++ std::vector<const char*> m_args; ++ CephContext *m_cct; ++ ObjectCacheStore *m_object_cache_store; ++ ContextWQ* pcache_op_work_queue; ++}; ++ ++} // namespace rbd ++} // namespace cache ++ ++#endif +diff --git a/src/tools/rbd_cache/CacheController.hpp b/src/tools/rbd_cache/CacheController.hpp +deleted file mode 100644 +index 97113e4..0000000 +--- a/src/tools/rbd_cache/CacheController.hpp ++++ /dev/null +@@ -1,49 +0,0 @@ +-#ifndef CACHE_CONTROLLER_H +-#define CACHE_CONTROLLER_H +- +-#include <thread> +- +-#include "common/Formatter.h" +-#include "common/admin_socket.h" +-#include "common/debug.h" +-#include "common/errno.h" +-#include "common/ceph_context.h" +-#include "common/Mutex.h" +-#include "common/WorkQueue.h" +-#include "include/rados/librados.hpp" +-#include "include/rbd/librbd.h" +-#include "include/assert.h" +-#include "librbd/ImageCtx.h" +-#include "librbd/ImageState.h" +- +-#include "CacheControllerSocket.hpp" +-#include "ObjectCacheStore.h" +- +- +-using boost::asio::local::stream_protocol; +- +-class CacheController { +- public: +- CacheController(CephContext *cct, const std::vector<const char*> &args); +- ~CacheController(); +- +- int init(); +- +- int shutdown(); +- +- void handle_signal(int sinnum); +- +- void run(); +- +- void handle_request(uint64_t sesstion_id, std::string msg); +- +- private: +- boost::asio::io_service io_service; +- CacheServer *m_cache_server; +- std::vector<const char*> m_args; +- CephContext *m_cct; +- ObjectCacheStore *m_object_cache_store; +- ContextWQ* pcache_op_work_queue; +-}; +- +-#endif +diff --git a/src/tools/rbd_cache/CacheControllerSocket.hpp b/src/tools/rbd_cache/CacheControllerSocket.hpp +index 6e1a743..967af1d 100644 +--- a/src/tools/rbd_cache/CacheControllerSocket.hpp ++++ b/src/tools/rbd_cache/CacheControllerSocket.hpp +@@ -17,6 +17,9 @@ + + using boost::asio::local::stream_protocol; + ++namespace rbd { ++namespace cache { ++ + class session : public std::enable_shared_from_this<session> { + public: + session(uint64_t session_id, boost::asio::io_service& io_service, ProcessMsg processmsg) +@@ -122,4 +125,7 @@ private: + std::map<uint64_t, session_ptr> session_map; + }; + ++} // namespace cache ++} // namespace rbd ++ + #endif +diff --git a/src/tools/rbd_cache/CacheControllerSocketClient.hpp b/src/tools/rbd_cache/CacheControllerSocketClient.hpp +index 8e61aa9..57be78e 100644 +--- a/src/tools/rbd_cache/CacheControllerSocketClient.hpp ++++ b/src/tools/rbd_cache/CacheControllerSocketClient.hpp +@@ -13,6 +13,9 @@ + + using boost::asio::local::stream_protocol; + ++namespace rbd { ++namespace cache { ++ + class CacheClient { + public: + CacheClient(boost::asio::io_service& io_service, +@@ -128,4 +131,6 @@ public: + bool connected = false; + }; + ++} // namespace cache ++} // namespace rbd + #endif +diff --git a/src/tools/rbd_cache/CacheControllerSocketCommon.h b/src/tools/rbd_cache/CacheControllerSocketCommon.h +index e253bb1..ab89155 100644 +--- a/src/tools/rbd_cache/CacheControllerSocketCommon.h ++++ b/src/tools/rbd_cache/CacheControllerSocketCommon.h +@@ -12,6 +12,9 @@ + #define RBDSC_LOOKUP_REPLY 0X16 + #define RBDSC_READ_RADOS 0X16 + ++namespace rbd { ++namespace cache { ++ + typedef std::function<void(uint64_t, std::string)> ProcessMsg; + typedef std::function<void(std::string)> ClientProcessMsg; + typedef uint8_t rbdsc_req_type; +@@ -40,4 +43,6 @@ struct rbdsc_req_type_t { + } + }; + ++} // namespace cache ++} // namespace rbd + #endif +diff --git a/src/tools/rbd_cache/ObjectCacheStore.cc b/src/tools/rbd_cache/ObjectCacheStore.cc +index 90b407c..9572a1a 100644 +--- a/src/tools/rbd_cache/ObjectCacheStore.cc ++++ b/src/tools/rbd_cache/ObjectCacheStore.cc +@@ -9,6 +9,8 @@ + #define dout_prefix *_dout << "rbd::cache::ObjectCacheStore: " << this << " " \ + << __func__ << ": " + ++namespace rbd { ++namespace cache { + + ObjectCacheStore::ObjectCacheStore(CephContext *cct, ContextWQ* work_queue) + : m_cct(cct), m_work_queue(work_queue), +@@ -77,7 +79,7 @@ int ObjectCacheStore::do_promote(std::string pool_name, std::string object_name) + } + + // persistent to cache +- os::CacheStore::SyncFile cache_file(m_cct, cache_file_name); ++ librbd::cache::SyncFile cache_file(m_cct, cache_file_name); + cache_file.open(); + ret = cache_file.write_object_to_file(read_buf, object_size); + +@@ -145,3 +147,6 @@ int ObjectCacheStore::promote_object(librados::IoCtx* ioctx, std::string object_ + return ret; + + } ++ ++} // namespace cache ++} // namespace rbd +diff --git a/src/tools/rbd_cache/ObjectCacheStore.h b/src/tools/rbd_cache/ObjectCacheStore.h +index 12f8399..a81beea 100644 +--- a/src/tools/rbd_cache/ObjectCacheStore.h ++++ b/src/tools/rbd_cache/ObjectCacheStore.h +@@ -12,11 +12,14 @@ + #include "include/rbd/librbd.h" + #include "librbd/ImageCtx.h" + #include "librbd/ImageState.h" +-#include "os/CacheStore/SyncFile.h" ++#include "librbd/cache/SharedPersistentObjectCacherFile.h" + + using librados::Rados; + using librados::IoCtx; + ++namespace rbd { ++namespace cache { ++ + typedef shared_ptr<librados::Rados> RadosRef; + typedef shared_ptr<librados::IoCtx> IoCtxRef; + +@@ -59,7 +62,9 @@ class ObjectCacheStore + + std::map<std::string, librados::IoCtx*> m_ioctxs; + +- os::CacheStore::SyncFile *m_cache_file; ++ librbd::cache::SyncFile *m_cache_file; + }; + ++} // namespace rbd ++} // namespace cache + #endif +diff --git a/src/tools/rbd_cache/main.cc b/src/tools/rbd_cache/main.cc +index 336a581..a7c5b64 100644 +--- a/src/tools/rbd_cache/main.cc ++++ b/src/tools/rbd_cache/main.cc +@@ -7,11 +7,11 @@ + #include "common/errno.h" + #include "global/global_init.h" + #include "global/signal_handler.h" +-#include "CacheController.hpp" ++#include "CacheController.h" + + #include <vector> + +-CacheController *cachectl = nullptr; ++rbd::cache::CacheController *cachectl = nullptr; + + void usage() { + std::cout << "usage: cache controller [options...]" << std::endl; +@@ -64,7 +64,7 @@ int main(int argc, const char **argv) + // disable unnecessary librbd cache + g_ceph_context->_conf->set_val_or_die("rbd_cache", "false"); + +- cachectl = new CacheController(g_ceph_context, cmd_args); ++ cachectl = new rbd::cache::CacheController(g_ceph_context, cmd_args); + int r = cachectl->init(); + if (r < 0) { + std::cerr << "failed to initialize: " << cpp_strerror(r) << std::endl; +-- +2.7.4 + diff --git a/src/ceph/0003-librbd-fix-bufferlist-point.patch b/src/ceph/0003-librbd-fix-bufferlist-point.patch new file mode 100644 index 0000000..b0030ea --- /dev/null +++ b/src/ceph/0003-librbd-fix-bufferlist-point.patch @@ -0,0 +1,71 @@ +From 656b499b00fb237c83b8ccccc519a1577d981199 Mon Sep 17 00:00:00 2001 +From: Yuan Zhou <yuan.zhou@intel.com> +Date: Mon, 6 Aug 2018 15:38:35 +0800 +Subject: [PATCH 03/10] librbd: fix bufferlist point + +Signed-off-by: Yuan Zhou <yuan.zhou@intel.com> +--- + src/tools/rbd_cache/ObjectCacheStore.cc | 10 +++++----- + src/tools/rbd_cache/ObjectCacheStore.h | 2 +- + 2 files changed, 6 insertions(+), 6 deletions(-) + +diff --git a/src/tools/rbd_cache/ObjectCacheStore.cc b/src/tools/rbd_cache/ObjectCacheStore.cc +index 9572a1a..2a87469 100644 +--- a/src/tools/rbd_cache/ObjectCacheStore.cc ++++ b/src/tools/rbd_cache/ObjectCacheStore.cc +@@ -63,13 +63,13 @@ int ObjectCacheStore::do_promote(std::string pool_name, std::string object_name) + m_cache_table.emplace(cache_file_name, PROMOTING); + } + +- librados::bufferlist read_buf; ++ librados::bufferlist* read_buf = new librados::bufferlist(); + int object_size = 4096*1024; //TODO(): read config from image metadata + + //TODO(): async promote + ret = promote_object(ioctx, object_name, read_buf, object_size); + if (ret == -ENOENT) { +- read_buf.append(std::string(object_size, '0')); ++ read_buf->append(std::string(object_size, '0')); + ret = 0; + } + +@@ -81,7 +81,7 @@ int ObjectCacheStore::do_promote(std::string pool_name, std::string object_name) + // persistent to cache + librbd::cache::SyncFile cache_file(m_cct, cache_file_name); + cache_file.open(); +- ret = cache_file.write_object_to_file(read_buf, object_size); ++ ret = cache_file.write_object_to_file(*read_buf, object_size); + + assert(m_cache_table.find(cache_file_name) != m_cache_table.end()); + +@@ -132,12 +132,12 @@ int ObjectCacheStore::lock_cache(std::string vol_name) { + return 0; + } + +-int ObjectCacheStore::promote_object(librados::IoCtx* ioctx, std::string object_name, librados::bufferlist read_buf, uint64_t read_len) { ++int ObjectCacheStore::promote_object(librados::IoCtx* ioctx, std::string object_name, librados::bufferlist* read_buf, uint64_t read_len) { + int ret; + + librados::AioCompletion* read_completion = librados::Rados::aio_create_completion(); + +- ret = ioctx->aio_read(object_name, read_completion, &read_buf, read_len, 0); ++ ret = ioctx->aio_read(object_name, read_completion, read_buf, read_len, 0); + if(ret < 0) { + lderr(m_cct) << "fail to read from rados" << dendl; + return ret; +diff --git a/src/tools/rbd_cache/ObjectCacheStore.h b/src/tools/rbd_cache/ObjectCacheStore.h +index a81beea..db09efa 100644 +--- a/src/tools/rbd_cache/ObjectCacheStore.h ++++ b/src/tools/rbd_cache/ObjectCacheStore.h +@@ -45,7 +45,7 @@ class ObjectCacheStore + int do_promote(std::string pool_name, std::string object_name); + + int promote_object(librados::IoCtx*, std::string object_name, +- librados::bufferlist read_buf, ++ librados::bufferlist* read_buf, + uint64_t length); + + enum { +-- +2.7.4 + diff --git a/src/ceph/0004-librbd-fix-lookup-object-return.patch b/src/ceph/0004-librbd-fix-lookup-object-return.patch new file mode 100644 index 0000000..0a2e2eb --- /dev/null +++ b/src/ceph/0004-librbd-fix-lookup-object-return.patch @@ -0,0 +1,45 @@ +From 779ef67e6a401ad569c5d3d3a076352db8ad0f67 Mon Sep 17 00:00:00 2001 +From: Yuan Zhou <yuan.zhou@intel.com> +Date: Mon, 6 Aug 2018 15:47:23 +0800 +Subject: [PATCH 04/10] librbd: fix lookup object return + +Signed-off-by: Yuan Zhou <yuan.zhou@intel.com> +--- + src/tools/rbd_cache/CacheControllerSocketClient.hpp | 7 ++++++- + src/tools/rbd_cache/CacheControllerSocketCommon.h | 2 +- + 2 files changed, 7 insertions(+), 2 deletions(-) + +diff --git a/src/tools/rbd_cache/CacheControllerSocketClient.hpp b/src/tools/rbd_cache/CacheControllerSocketClient.hpp +index 57be78e..4e1f36c 100644 +--- a/src/tools/rbd_cache/CacheControllerSocketClient.hpp ++++ b/src/tools/rbd_cache/CacheControllerSocketClient.hpp +@@ -99,7 +99,12 @@ public: + boost::asio::transfer_exactly(544), + [this, result](const boost::system::error_code& err, size_t cb) { + if (!err) { +- *result = true; ++ rbdsc_req_type_t *io_ctx = (rbdsc_req_type_t*)(buffer_); ++ if (io_ctx->type == RBDSC_READ_REPLY) { ++ *result = true; ++ } else { ++ *result = false; ++ } + cv.notify_one(); + m_client_process_msg(std::string(buffer_, cb)); + } else { +diff --git a/src/tools/rbd_cache/CacheControllerSocketCommon.h b/src/tools/rbd_cache/CacheControllerSocketCommon.h +index ab89155..a9d73a8 100644 +--- a/src/tools/rbd_cache/CacheControllerSocketCommon.h ++++ b/src/tools/rbd_cache/CacheControllerSocketCommon.h +@@ -10,7 +10,7 @@ + #define RBDSC_REGISTER_REPLY 0X14 + #define RBDSC_READ_REPLY 0X15 + #define RBDSC_LOOKUP_REPLY 0X16 +-#define RBDSC_READ_RADOS 0X16 ++#define RBDSC_READ_RADOS 0X17 + + namespace rbd { + namespace cache { +-- +2.7.4 + diff --git a/src/ceph/0005-librbd-fix-conf-get_val.patch b/src/ceph/0005-librbd-fix-conf-get_val.patch new file mode 100644 index 0000000..6bc5268 --- /dev/null +++ b/src/ceph/0005-librbd-fix-conf-get_val.patch @@ -0,0 +1,63 @@ +From be47f5e7980d92379b9feaef2e2f1e1579df94ba Mon Sep 17 00:00:00 2001 +From: Yuan Zhou <yuan.zhou@intel.com> +Date: Mon, 6 Aug 2018 22:05:13 +0800 +Subject: [PATCH 05/10] librbd: fix conf get_val + +Signed-off-by: Yuan Zhou <yuan.zhou@intel.com> +--- + src/librbd/cache/SharedPersistentObjectCacherFile.cc | 2 +- + src/tools/rbd_cache/CacheController.cc | 2 +- + src/tools/rbd_cache/main.cc | 4 ++-- + 3 files changed, 4 insertions(+), 4 deletions(-) + +diff --git a/src/librbd/cache/SharedPersistentObjectCacherFile.cc b/src/librbd/cache/SharedPersistentObjectCacherFile.cc +index 75a3053..6f9c3e0 100644 +--- a/src/librbd/cache/SharedPersistentObjectCacherFile.cc ++++ b/src/librbd/cache/SharedPersistentObjectCacherFile.cc +@@ -23,7 +23,7 @@ namespace cache { + + SyncFile::SyncFile(CephContext *cct, const std::string &name) + : cct(cct) { +- m_name = cct->_conf->get_val<std::string>("rbd_shared_cache_path") + "/rbd_cache." + name; ++ m_name = cct->_conf.get_val<std::string>("rbd_shared_cache_path") + "/rbd_cache." + name; + ldout(cct, 20) << "file path=" << m_name << dendl; + } + +diff --git a/src/tools/rbd_cache/CacheController.cc b/src/tools/rbd_cache/CacheController.cc +index e73ba25..cefcf28 100644 +--- a/src/tools/rbd_cache/CacheController.cc ++++ b/src/tools/rbd_cache/CacheController.cc +@@ -20,7 +20,7 @@ public: + : ThreadPool(cct, "librbd::cache::thread_pool", "tp_librbd_cache", 32, + "pcache_threads"), + op_work_queue(new ContextWQ("librbd::pcache_op_work_queue", +- cct->_conf->get_val<int64_t>("rbd_op_thread_timeout"), ++ cct->_conf.get_val<int64_t>("rbd_op_thread_timeout"), + this)) { + start(); + } +diff --git a/src/tools/rbd_cache/main.cc b/src/tools/rbd_cache/main.cc +index a7c5b64..d604760 100644 +--- a/src/tools/rbd_cache/main.cc ++++ b/src/tools/rbd_cache/main.cc +@@ -46,7 +46,7 @@ int main(int argc, const char **argv) + } + } + +- if (g_conf->daemonize) { ++ if (g_conf()->daemonize) { + global_init_daemonize(g_ceph_context); + } + g_ceph_context->enable_perf_counter(); +@@ -62,7 +62,7 @@ int main(int argc, const char **argv) + argv_to_vec(argc, argv, cmd_args); + + // disable unnecessary librbd cache +- g_ceph_context->_conf->set_val_or_die("rbd_cache", "false"); ++ g_ceph_context->_conf.set_val_or_die("rbd_cache", "false"); + + cachectl = new rbd::cache::CacheController(g_ceph_context, cmd_args); + int r = cachectl->init(); +-- +2.7.4 + diff --git a/src/ceph/0006-librbd-LRU-policy-based-eviction.patch b/src/ceph/0006-librbd-LRU-policy-based-eviction.patch new file mode 100644 index 0000000..eb99e3a --- /dev/null +++ b/src/ceph/0006-librbd-LRU-policy-based-eviction.patch @@ -0,0 +1,403 @@ +From b233d6540160c8bc5cc25b870c2140fa48776fa6 Mon Sep 17 00:00:00 2001 +From: Dehao Shang <dehao.shang@intel.com> +Date: Mon, 6 Aug 2018 22:42:38 +0800 +Subject: [PATCH 06/10] librbd: LRU policy based eviction + +Signed-off-by: Dehao Shang <dehao.shang@intel.com> +Signed-off-by: Yuan Zhou <yuan.zhou@intel.com> +--- + src/tools/rbd_cache/ObjectCacheStore.cc | 73 ++++++++----- + src/tools/rbd_cache/ObjectCacheStore.h | 14 +-- + src/tools/rbd_cache/Policy.hpp | 22 ++++ + src/tools/rbd_cache/SimplePolicy.hpp | 180 ++++++++++++++++++++++++++++++++ + 4 files changed, 254 insertions(+), 35 deletions(-) + create mode 100644 src/tools/rbd_cache/Policy.hpp + create mode 100644 src/tools/rbd_cache/SimplePolicy.hpp + +diff --git a/src/tools/rbd_cache/ObjectCacheStore.cc b/src/tools/rbd_cache/ObjectCacheStore.cc +index 2a87469..b39fe66 100644 +--- a/src/tools/rbd_cache/ObjectCacheStore.cc ++++ b/src/tools/rbd_cache/ObjectCacheStore.cc +@@ -14,12 +14,12 @@ namespace cache { + + ObjectCacheStore::ObjectCacheStore(CephContext *cct, ContextWQ* work_queue) + : m_cct(cct), m_work_queue(work_queue), +- m_cache_table_lock("rbd::cache::ObjectCacheStore"), + m_rados(new librados::Rados()) { ++ m_policy = new SimplePolicy(4096, 0.9); // TODO + } + + ObjectCacheStore::~ObjectCacheStore() { +- ++ delete m_policy; + } + + int ObjectCacheStore::init(bool reset) { +@@ -43,6 +43,7 @@ int ObjectCacheStore::do_promote(std::string pool_name, std::string object_name) + int ret = 0; + std::string cache_file_name = pool_name + object_name; + ++ //TODO(): lock on ioctx map + if (m_ioctxs.find(pool_name) == m_ioctxs.end()) { + librados::IoCtx* io_ctx = new librados::IoCtx(); + ret = m_rados->ioctx_create(pool_name.c_str(), *io_ctx); +@@ -58,10 +59,8 @@ int ObjectCacheStore::do_promote(std::string pool_name, std::string object_name) + librados::IoCtx* ioctx = m_ioctxs[pool_name]; + + //promoting: update metadata +- { +- Mutex::Locker locker(m_cache_table_lock); +- m_cache_table.emplace(cache_file_name, PROMOTING); +- } ++ m_policy->update_status(cache_file_name, PROMOTING); ++ assert(PROMOTING == m_policy->get_status(cache_file_name)); + + librados::bufferlist* read_buf = new librados::bufferlist(); + int object_size = 4096*1024; //TODO(): read config from image metadata +@@ -83,42 +82,60 @@ int ObjectCacheStore::do_promote(std::string pool_name, std::string object_name) + cache_file.open(); + ret = cache_file.write_object_to_file(*read_buf, object_size); + +- assert(m_cache_table.find(cache_file_name) != m_cache_table.end()); +- + // update metadata +- { +- Mutex::Locker locker(m_cache_table_lock); +- m_cache_table.emplace(cache_file_name, PROMOTED); +- } ++ assert(PROMOTING == m_policy->get_status(cache_file_name)); ++ m_policy->update_status(cache_file_name, PROMOTED); ++ assert(PROMOTED == m_policy->get_status(cache_file_name)); + + return ret; + + } + ++// return -1, client need to read data from cluster. ++// return 0, client directly read data from cache. + int ObjectCacheStore::lookup_object(std::string pool_name, std::string object_name) { + + std::string cache_file_name = pool_name + object_name; +- { +- Mutex::Locker locker(m_cache_table_lock); +- +- auto it = m_cache_table.find(cache_file_name); +- if (it != m_cache_table.end()) { +- +- if (it->second == PROMOTING) { +- return -1; +- } else if (it->second == PROMOTED) { +- return 0; +- } else { +- assert(0); +- } +- } ++ ++ // TODO lookup and return status; ++ ++ CACHESTATUS ret; ++ ret = m_policy->lookup_object(cache_file_name); ++ ++ switch(ret) { ++ case NONE: ++ return do_promote(pool_name, object_name); ++ case PROMOTING: ++ return -1; ++ case PROMOTED: ++ return 0; ++ default: ++ return -1; + } ++} + +- int ret = do_promote(pool_name, object_name); ++void ObjectCacheStore::evict_thread_body() { ++ int ret; ++ while(m_evict_go) { ++ std::string temp_cache_file; + +- return ret; ++ ret = m_policy->evict_object(temp_cache_file); ++ if(ret == 0) { ++ continue; ++ } ++ ++ // TODO ++ // delete temp_cache_file file. ++ ++ assert(EVICTING == m_policy->get_status(temp_cache_file)); ++ ++ m_policy->update_status(temp_cache_file, EVICTED); ++ ++ assert(NONE == m_policy->get_status(temp_cache_file)); ++ } + } + ++ + int ObjectCacheStore::shutdown() { + m_rados->shutdown(); + return 0; +diff --git a/src/tools/rbd_cache/ObjectCacheStore.h b/src/tools/rbd_cache/ObjectCacheStore.h +index db09efa..5118a73 100644 +--- a/src/tools/rbd_cache/ObjectCacheStore.h ++++ b/src/tools/rbd_cache/ObjectCacheStore.h +@@ -13,6 +13,7 @@ + #include "librbd/ImageCtx.h" + #include "librbd/ImageState.h" + #include "librbd/cache/SharedPersistentObjectCacherFile.h" ++#include "SimplePolicy.hpp" + + using librados::Rados; + using librados::IoCtx; +@@ -39,6 +40,8 @@ class ObjectCacheStore + + int lock_cache(std::string vol_name); + ++ void evict_thread_body(); ++ + private: + int _evict_object(); + +@@ -48,21 +51,18 @@ class ObjectCacheStore + librados::bufferlist* read_buf, + uint64_t length); + +- enum { +- PROMOTING = 0, +- PROMOTED, +- }; +- + CephContext *m_cct; + ContextWQ* m_work_queue; +- Mutex m_cache_table_lock; + RadosRef m_rados; + +- std::map<std::string, uint8_t> m_cache_table; + + std::map<std::string, librados::IoCtx*> m_ioctxs; + + librbd::cache::SyncFile *m_cache_file; ++ ++ Policy* m_policy; ++ ++ bool m_evict_go; + }; + + } // namespace rbd +diff --git a/src/tools/rbd_cache/Policy.hpp b/src/tools/rbd_cache/Policy.hpp +new file mode 100644 +index 0000000..575c294 +--- /dev/null ++++ b/src/tools/rbd_cache/Policy.hpp +@@ -0,0 +1,22 @@ ++#ifndef RBD_CACHE_POLICY_HPP ++#define RBD_CACHE_POLICY_HPP ++ ++enum CACHESTATUS { ++ NONE = 0, ++ PROMOTING, ++ PROMOTED, ++ EVICTING, ++ EVICTED, ++}; ++ ++ ++class Policy { ++public: ++ Policy(){} ++ virtual ~Policy(){}; ++ virtual CACHESTATUS lookup_object(std::string) = 0; ++ virtual int evict_object(std::string&) = 0; ++ virtual void update_status(std::string, CACHESTATUS) = 0; ++ virtual CACHESTATUS get_status(std::string) = 0; ++}; ++#endif +diff --git a/src/tools/rbd_cache/SimplePolicy.hpp b/src/tools/rbd_cache/SimplePolicy.hpp +new file mode 100644 +index 0000000..a0d8de7 +--- /dev/null ++++ b/src/tools/rbd_cache/SimplePolicy.hpp +@@ -0,0 +1,180 @@ ++#ifndef RBD_CACHE_SIMPLE_POLICY_HPP ++#define RBD_CACHE_SIMPLE_POLICY_HPP ++ ++#include "Policy.hpp" ++#include "include/lru.h" ++#include "common/Mutex.h" ++ ++#include <vector> ++#include <unordered_map> ++#include <string> ++ ++class SimplePolicy : public Policy { ++public: ++ SimplePolicy(uint64_t block_num, float level) ++ : m_level(level), ++ m_lock("SimplePolicy"), ++ m_entry_count(block_num) ++ { ++ ++ Entry m_entries[m_entry_count]; ++ ++ for(auto &entry : m_entries) { ++ m_free_lru.lru_insert_bot(&entry); ++ } ++ } ++ ++ ~SimplePolicy() {} ++ ++ CACHESTATUS lookup_object(std::string cache_file_name) { ++ Mutex::Locker locker(m_lock); ++ ++ auto entry_it = m_oid_to_entry.find(cache_file_name); ++ if(entry_it == m_oid_to_entry.end()) { ++ return NONE; ++ } ++ ++ Entry* entry = entry_it->second; ++ ++ LRU* lru; ++ if(entry->status == PROMOTED) { ++ lru = &m_promoted_lru; ++ } else { ++ lru = &m_handing_lru; ++ } ++ ++ // touch it ++ lru->lru_remove(entry); ++ lru->lru_insert_top(entry); ++ ++ return entry->status; ++ } ++ ++ int evict_object(std::string& out_cache_file_name) { ++ Mutex::Locker locker(m_lock); ++ ++ // still have enough free space, don't need to evict lru. ++ uint64_t temp_current_size = m_oid_to_entry.size(); ++ float temp_current_evict_level = temp_current_size / m_entry_count; ++ if(temp_current_evict_level < m_level) { ++ return 0; ++ } ++ ++ // when all entries are USING, PROMOTING or EVICTING, just busy waiting. ++ if(m_promoted_lru.lru_get_size() == 0) { ++ return 0; ++ } ++ ++ assert(m_promoted_lru.lru_get_size() != 0); ++ ++ // evict one item from promoted lru ++ Entry *entry = reinterpret_cast<Entry*>(m_promoted_lru.lru_get_next_expire()); ++ assert(entry != nullptr); ++ ++ assert(entry->status == PROMOTED); ++ ++ out_cache_file_name = entry->cache_file_name; ++ entry->status = EVICTING; ++ ++ m_promoted_lru.lru_remove(entry); ++ m_handing_lru.lru_insert_top(entry); ++ ++ return 1; ++ } ++ ++ // TODO(): simplify the logic ++ void update_status(std::string _file_name, CACHESTATUS _status) { ++ Mutex::Locker locker(m_lock); ++ ++ Entry* entry; ++ auto entry_it = m_oid_to_entry.find(_file_name); ++ ++ // just check. ++ if(_status == PROMOTING) { ++ assert(m_oid_to_entry.find(_file_name) == m_oid_to_entry.end()); ++ } ++ ++ // miss this object. ++ if(entry_it == m_oid_to_entry.end() && _status == PROMOTING) { ++ entry = reinterpret_cast<Entry*>(m_free_lru.lru_get_next_expire()); ++ if(entry == nullptr) { ++ assert(0); // namely evict thread have some problems. ++ } ++ ++ entry->status = PROMOTING; ++ ++ m_oid_to_entry[_file_name] = entry; ++ m_free_lru.lru_remove(entry); ++ m_handing_lru.lru_insert_top(entry); ++ ++ return; ++ } ++ ++ assert(entry_it != m_oid_to_entry.end()); ++ ++ entry = entry_it->second; ++ ++ // promoting action have been finished, so update it. ++ if(entry->status == PROMOTING && _status== PROMOTED) { ++ m_handing_lru.lru_remove(entry); ++ m_promoted_lru.lru_insert_top(entry); ++ entry->status = PROMOTED; ++ return; ++ } ++ ++ // will delete this cache file ++ if(entry->status == PROMOTED && _status == EVICTING) { ++ m_promoted_lru.lru_remove(entry); ++ m_handing_lru.lru_insert_top(entry); ++ entry->status = EVICTING; ++ return; ++ } ++ ++ ++ if(_status == EVICTED) { ++ m_oid_to_entry.erase(entry_it); ++ m_handing_lru.lru_remove(entry); ++ m_free_lru.lru_insert_bot(entry); ++ return; ++ } ++ ++ assert(0); ++ } ++ ++ // get entry status ++ CACHESTATUS get_status(std::string _file_name) { ++ Mutex::Locker locker(m_lock); ++ auto entry_it = m_oid_to_entry.find(_file_name); ++ if(entry_it == m_oid_to_entry.end()) { ++ return NONE; ++ } ++ ++ return entry_it->second->status; ++ } ++ ++ ++private: ++ ++ class Entry : public LRUObject { ++ public: ++ CACHESTATUS status; ++ Entry() : status(NONE){} ++ std::string cache_file_name; ++ void encode(bufferlist &bl){} ++ void decode(bufferlist::iterator &it){} ++ }; ++ ++ std::unordered_map<std::string, Entry*> m_oid_to_entry; ++ ++ LRU m_free_lru; ++ LRU m_handing_lru; // include promoting status or evicting status ++ LRU m_promoted_lru; // include promoted, using status. ++ ++ mutable Mutex m_lock; ++ ++ float m_level; ++ uint64_t m_entry_count; ++ ++}; ++ ++#endif +-- +2.7.4 + diff --git a/src/ceph/0007-librbd-cleanup-policy-based-promotion-eviction.patch b/src/ceph/0007-librbd-cleanup-policy-based-promotion-eviction.patch new file mode 100644 index 0000000..010407b --- /dev/null +++ b/src/ceph/0007-librbd-cleanup-policy-based-promotion-eviction.patch @@ -0,0 +1,512 @@ +From dd4804fb05ad8aca51516b0112975cc91ef85a6b Mon Sep 17 00:00:00 2001 +From: Yuan Zhou <yuan.zhou@intel.com> +Date: Wed, 8 Aug 2018 15:31:47 +0800 +Subject: [PATCH 07/10] librbd: cleanup policy based promotion/eviction + +Signed-off-by: Yuan Zhou <yuan.zhou@intel.com> +--- + src/common/options.cc | 4 + + .../rbd_cache/CacheControllerSocketClient.hpp | 3 +- + src/tools/rbd_cache/ObjectCacheStore.cc | 63 +++---- + src/tools/rbd_cache/ObjectCacheStore.h | 10 +- + src/tools/rbd_cache/Policy.hpp | 18 +- + src/tools/rbd_cache/SimplePolicy.hpp | 188 +++++++++------------ + 6 files changed, 141 insertions(+), 145 deletions(-) + +diff --git a/src/common/options.cc b/src/common/options.cc +index 7839a31..b334c1e 100644 +--- a/src/common/options.cc ++++ b/src/common/options.cc +@@ -6365,6 +6365,10 @@ static std::vector<Option> get_rbd_options() { + .set_default("/tmp") + .set_description("shared ssd caching data dir"), + ++ Option("rbd_shared_cache_entries", Option::TYPE_INT, Option::LEVEL_ADVANCED) ++ .set_default(4096) ++ .set_description("shared ssd caching data entries"), ++ + Option("rbd_non_blocking_aio", Option::TYPE_BOOL, Option::LEVEL_ADVANCED) + .set_default(true) + .set_description("process AIO ops from a dispatch thread to prevent blocking"), +diff --git a/src/tools/rbd_cache/CacheControllerSocketClient.hpp b/src/tools/rbd_cache/CacheControllerSocketClient.hpp +index 4e1f36c..56b79ce 100644 +--- a/src/tools/rbd_cache/CacheControllerSocketClient.hpp ++++ b/src/tools/rbd_cache/CacheControllerSocketClient.hpp +@@ -90,7 +90,8 @@ public: + } + }); + std::unique_lock<std::mutex> lk(m); +- cv.wait(lk); ++ //cv.wait(lk); ++ cv.wait_for(lk, std::chrono::milliseconds(100)); + return 0; + } + +diff --git a/src/tools/rbd_cache/ObjectCacheStore.cc b/src/tools/rbd_cache/ObjectCacheStore.cc +index b39fe66..99f90d6 100644 +--- a/src/tools/rbd_cache/ObjectCacheStore.cc ++++ b/src/tools/rbd_cache/ObjectCacheStore.cc +@@ -15,7 +15,12 @@ namespace cache { + ObjectCacheStore::ObjectCacheStore(CephContext *cct, ContextWQ* work_queue) + : m_cct(cct), m_work_queue(work_queue), + m_rados(new librados::Rados()) { +- m_policy = new SimplePolicy(4096, 0.9); // TODO ++ ++ uint64_t object_cache_entries = ++ cct->_conf.get_val<int64_t>("rbd_shared_cache_entries"); ++ ++ //TODO(): allow to set level ++ m_policy = new SimplePolicy(object_cache_entries, 0.5); + } + + ObjectCacheStore::~ObjectCacheStore() { +@@ -35,7 +40,16 @@ int ObjectCacheStore::init(bool reset) { + lderr(m_cct) << "fail to conect to cluster" << dendl; + return ret; + } +- //TODO(): check existing cache objects ++ ++ std::string cache_path = m_cct->_conf.get_val<std::string>("rbd_shared_cache_path"); ++ //TODO(): check and reuse existing cache objects ++ if(reset) { ++ std::string cmd = "exec rm -rf " + cache_path + "/rbd_cache*; exec mkdir -p " + cache_path; ++ //TODO(): to use std::filesystem ++ int r = system(cmd.c_str()); ++ } ++ ++ evict_thd = new std::thread([this]{this->evict_thread_body();}); + return ret; + } + +@@ -58,10 +72,6 @@ int ObjectCacheStore::do_promote(std::string pool_name, std::string object_name) + + librados::IoCtx* ioctx = m_ioctxs[pool_name]; + +- //promoting: update metadata +- m_policy->update_status(cache_file_name, PROMOTING); +- assert(PROMOTING == m_policy->get_status(cache_file_name)); +- + librados::bufferlist* read_buf = new librados::bufferlist(); + int object_size = 4096*1024; //TODO(): read config from image metadata + +@@ -83,9 +93,9 @@ int ObjectCacheStore::do_promote(std::string pool_name, std::string object_name) + ret = cache_file.write_object_to_file(*read_buf, object_size); + + // update metadata +- assert(PROMOTING == m_policy->get_status(cache_file_name)); +- m_policy->update_status(cache_file_name, PROMOTED); +- assert(PROMOTED == m_policy->get_status(cache_file_name)); ++ assert(OBJ_CACHE_PROMOTING == m_policy->get_status(cache_file_name)); ++ m_policy->update_status(cache_file_name, OBJ_CACHE_PROMOTED); ++ assert(OBJ_CACHE_PROMOTED == m_policy->get_status(cache_file_name)); + + return ret; + +@@ -97,18 +107,15 @@ int ObjectCacheStore::lookup_object(std::string pool_name, std::string object_na + + std::string cache_file_name = pool_name + object_name; + +- // TODO lookup and return status; +- + CACHESTATUS ret; + ret = m_policy->lookup_object(cache_file_name); + + switch(ret) { +- case NONE: ++ case OBJ_CACHE_NONE: + return do_promote(pool_name, object_name); +- case PROMOTING: +- return -1; +- case PROMOTED: ++ case OBJ_CACHE_PROMOTED: + return 0; ++ case OBJ_CACHE_PROMOTING: + default: + return -1; + } +@@ -117,26 +124,14 @@ int ObjectCacheStore::lookup_object(std::string pool_name, std::string object_na + void ObjectCacheStore::evict_thread_body() { + int ret; + while(m_evict_go) { +- std::string temp_cache_file; +- +- ret = m_policy->evict_object(temp_cache_file); +- if(ret == 0) { +- continue; +- } +- +- // TODO +- // delete temp_cache_file file. +- +- assert(EVICTING == m_policy->get_status(temp_cache_file)); +- +- m_policy->update_status(temp_cache_file, EVICTED); +- +- assert(NONE == m_policy->get_status(temp_cache_file)); ++ ret = evict_objects(); + } + } + + + int ObjectCacheStore::shutdown() { ++ m_evict_go = false; ++ evict_thd->join(); + m_rados->shutdown(); + return 0; + } +@@ -165,5 +160,13 @@ int ObjectCacheStore::promote_object(librados::IoCtx* ioctx, std::string object_ + + } + ++int ObjectCacheStore::evict_objects() { ++ std::list<std::string> obj_list; ++ m_policy->get_evict_list(&obj_list); ++ for (auto& obj: obj_list) { ++ //do_evict(obj); ++ } ++} ++ + } // namespace cache + } // namespace rbd +diff --git a/src/tools/rbd_cache/ObjectCacheStore.h b/src/tools/rbd_cache/ObjectCacheStore.h +index 5118a73..ba0e1f1 100644 +--- a/src/tools/rbd_cache/ObjectCacheStore.h ++++ b/src/tools/rbd_cache/ObjectCacheStore.h +@@ -15,6 +15,7 @@ + #include "librbd/cache/SharedPersistentObjectCacherFile.h" + #include "SimplePolicy.hpp" + ++ + using librados::Rados; + using librados::IoCtx; + +@@ -40,10 +41,9 @@ class ObjectCacheStore + + int lock_cache(std::string vol_name); + +- void evict_thread_body(); +- + private: +- int _evict_object(); ++ void evict_thread_body(); ++ int evict_objects(); + + int do_promote(std::string pool_name, std::string object_name); + +@@ -61,8 +61,8 @@ class ObjectCacheStore + librbd::cache::SyncFile *m_cache_file; + + Policy* m_policy; +- +- bool m_evict_go; ++ std::thread* evict_thd; ++ bool m_evict_go = false; + }; + + } // namespace rbd +diff --git a/src/tools/rbd_cache/Policy.hpp b/src/tools/rbd_cache/Policy.hpp +index 575c294..711e3bd 100644 +--- a/src/tools/rbd_cache/Policy.hpp ++++ b/src/tools/rbd_cache/Policy.hpp +@@ -1,12 +1,16 @@ + #ifndef RBD_CACHE_POLICY_HPP + #define RBD_CACHE_POLICY_HPP + ++#include <list> ++#include <string> ++ ++namespace rbd { ++namespace cache { ++ + enum CACHESTATUS { +- NONE = 0, +- PROMOTING, +- PROMOTED, +- EVICTING, +- EVICTED, ++ OBJ_CACHE_NONE = 0, ++ OBJ_CACHE_PROMOTING, ++ OBJ_CACHE_PROMOTED, + }; + + +@@ -18,5 +22,9 @@ public: + virtual int evict_object(std::string&) = 0; + virtual void update_status(std::string, CACHESTATUS) = 0; + virtual CACHESTATUS get_status(std::string) = 0; ++ virtual void get_evict_list(std::list<std::string>* obj_list) = 0; + }; ++ ++} // namespace cache ++} // namespace rbd + #endif +diff --git a/src/tools/rbd_cache/SimplePolicy.hpp b/src/tools/rbd_cache/SimplePolicy.hpp +index a0d8de7..e785de1 100644 +--- a/src/tools/rbd_cache/SimplePolicy.hpp ++++ b/src/tools/rbd_cache/SimplePolicy.hpp +@@ -3,138 +3,93 @@ + + #include "Policy.hpp" + #include "include/lru.h" ++#include "common/RWLock.h" + #include "common/Mutex.h" + + #include <vector> + #include <unordered_map> + #include <string> + ++namespace rbd { ++namespace cache { ++ ++ + class SimplePolicy : public Policy { + public: +- SimplePolicy(uint64_t block_num, float level) +- : m_level(level), +- m_lock("SimplePolicy"), +- m_entry_count(block_num) ++ SimplePolicy(uint64_t block_num, float watermark) ++ : m_watermark(watermark), m_entry_count(block_num), ++ m_cache_map_lock("rbd::cache::SimplePolicy::m_cache_map_lock"), ++ m_free_list_lock("rbd::cache::SimplePolicy::m_free_list_lock") + { + +- Entry m_entries[m_entry_count]; +- +- for(auto &entry : m_entries) { +- m_free_lru.lru_insert_bot(&entry); ++ for(uint64_t i = 0; i < m_entry_count; i++) { ++ m_free_list.push_back(new Entry()); + } ++ + } + +- ~SimplePolicy() {} ++ ~SimplePolicy() { ++ for(uint64_t i = 0; i < m_entry_count; i++) { ++ Entry* entry = reinterpret_cast<Entry*>(m_free_list.front()); ++ delete entry; ++ m_free_list.pop_front(); ++ } ++ } + + CACHESTATUS lookup_object(std::string cache_file_name) { +- Mutex::Locker locker(m_lock); + +- auto entry_it = m_oid_to_entry.find(cache_file_name); +- if(entry_it == m_oid_to_entry.end()) { +- return NONE; ++ //TODO(): check race condition ++ RWLock::WLocker wlocker(m_cache_map_lock); ++ ++ auto entry_it = m_cache_map.find(cache_file_name); ++ if(entry_it == m_cache_map.end()) { ++ Mutex::Locker locker(m_free_list_lock); ++ Entry* entry = reinterpret_cast<Entry*>(m_free_list.front()); ++ assert(entry != nullptr); ++ m_free_list.pop_front(); ++ entry->status = OBJ_CACHE_PROMOTING; ++ ++ m_cache_map[cache_file_name] = entry; ++ ++ return OBJ_CACHE_NONE; + } + + Entry* entry = entry_it->second; + +- LRU* lru; +- if(entry->status == PROMOTED) { +- lru = &m_promoted_lru; +- } else { +- lru = &m_handing_lru; ++ if(entry->status == OBJ_CACHE_PROMOTED) { ++ // touch it ++ m_promoted_lru.lru_touch(entry); + } + +- // touch it +- lru->lru_remove(entry); +- lru->lru_insert_top(entry); +- + return entry->status; + } + + int evict_object(std::string& out_cache_file_name) { +- Mutex::Locker locker(m_lock); +- +- // still have enough free space, don't need to evict lru. +- uint64_t temp_current_size = m_oid_to_entry.size(); +- float temp_current_evict_level = temp_current_size / m_entry_count; +- if(temp_current_evict_level < m_level) { +- return 0; +- } +- +- // when all entries are USING, PROMOTING or EVICTING, just busy waiting. +- if(m_promoted_lru.lru_get_size() == 0) { +- return 0; +- } +- +- assert(m_promoted_lru.lru_get_size() != 0); +- +- // evict one item from promoted lru +- Entry *entry = reinterpret_cast<Entry*>(m_promoted_lru.lru_get_next_expire()); +- assert(entry != nullptr); +- +- assert(entry->status == PROMOTED); +- +- out_cache_file_name = entry->cache_file_name; +- entry->status = EVICTING; +- +- m_promoted_lru.lru_remove(entry); +- m_handing_lru.lru_insert_top(entry); ++ RWLock::WLocker locker(m_cache_map_lock); + + return 1; + } + + // TODO(): simplify the logic +- void update_status(std::string _file_name, CACHESTATUS _status) { +- Mutex::Locker locker(m_lock); ++ void update_status(std::string file_name, CACHESTATUS new_status) { ++ RWLock::WLocker locker(m_cache_map_lock); + + Entry* entry; +- auto entry_it = m_oid_to_entry.find(_file_name); ++ auto entry_it = m_cache_map.find(file_name); + + // just check. +- if(_status == PROMOTING) { +- assert(m_oid_to_entry.find(_file_name) == m_oid_to_entry.end()); ++ if(new_status == OBJ_CACHE_PROMOTING) { ++ assert(entry_it == m_cache_map.end()); + } + +- // miss this object. +- if(entry_it == m_oid_to_entry.end() && _status == PROMOTING) { +- entry = reinterpret_cast<Entry*>(m_free_lru.lru_get_next_expire()); +- if(entry == nullptr) { +- assert(0); // namely evict thread have some problems. +- } +- +- entry->status = PROMOTING; +- +- m_oid_to_entry[_file_name] = entry; +- m_free_lru.lru_remove(entry); +- m_handing_lru.lru_insert_top(entry); +- +- return; +- } +- +- assert(entry_it != m_oid_to_entry.end()); ++ assert(entry_it != m_cache_map.end()); + + entry = entry_it->second; + +- // promoting action have been finished, so update it. +- if(entry->status == PROMOTING && _status== PROMOTED) { +- m_handing_lru.lru_remove(entry); ++ // promoting is done, so update it. ++ if(entry->status == OBJ_CACHE_PROMOTING && new_status== OBJ_CACHE_PROMOTED) { + m_promoted_lru.lru_insert_top(entry); +- entry->status = PROMOTED; +- return; +- } +- +- // will delete this cache file +- if(entry->status == PROMOTED && _status == EVICTING) { +- m_promoted_lru.lru_remove(entry); +- m_handing_lru.lru_insert_top(entry); +- entry->status = EVICTING; +- return; +- } +- +- +- if(_status == EVICTED) { +- m_oid_to_entry.erase(entry_it); +- m_handing_lru.lru_remove(entry); +- m_free_lru.lru_insert_bot(entry); ++ entry->status = new_status; + return; + } + +@@ -142,39 +97,64 @@ public: + } + + // get entry status +- CACHESTATUS get_status(std::string _file_name) { +- Mutex::Locker locker(m_lock); +- auto entry_it = m_oid_to_entry.find(_file_name); +- if(entry_it == m_oid_to_entry.end()) { +- return NONE; ++ CACHESTATUS get_status(std::string file_name) { ++ RWLock::RLocker locker(m_cache_map_lock); ++ auto entry_it = m_cache_map.find(file_name); ++ if(entry_it == m_cache_map.end()) { ++ return OBJ_CACHE_NONE; + } + + return entry_it->second->status; + } + ++ void get_evict_list(std::list<std::string>* obj_list) { ++ RWLock::WLocker locker(m_cache_map_lock); ++ // check free ratio, pop entries from LRU ++ if (m_free_list.size() / m_entry_count < m_watermark) { ++ int evict_num = 10; //TODO(): make this configurable ++ for(int i = 0; i < evict_num; i++) { ++ Entry* entry = reinterpret_cast<Entry*>(m_promoted_lru.lru_expire()); ++ if (entry == nullptr) { ++ continue; ++ } ++ std::string file_name = entry->cache_file_name; ++ obj_list->push_back(file_name); ++ ++ auto entry_it = m_cache_map.find(file_name); ++ m_cache_map.erase(entry_it); ++ ++ //mark this entry as free ++ entry->status = OBJ_CACHE_NONE; ++ Mutex::Locker locker(m_free_list_lock); ++ m_free_list.push_back(entry); ++ } ++ } ++ } + + private: + + class Entry : public LRUObject { + public: + CACHESTATUS status; +- Entry() : status(NONE){} ++ Entry() : status(OBJ_CACHE_NONE){} + std::string cache_file_name; + void encode(bufferlist &bl){} + void decode(bufferlist::iterator &it){} + }; + +- std::unordered_map<std::string, Entry*> m_oid_to_entry; ++ float m_watermark; ++ uint64_t m_entry_count; + +- LRU m_free_lru; +- LRU m_handing_lru; // include promoting status or evicting status +- LRU m_promoted_lru; // include promoted, using status. ++ std::unordered_map<std::string, Entry*> m_cache_map; ++ RWLock m_cache_map_lock; + +- mutable Mutex m_lock; ++ std::deque<Entry*> m_free_list; ++ Mutex m_free_list_lock; + +- float m_level; +- uint64_t m_entry_count; ++ LRU m_promoted_lru; // include promoted, using status. + + }; + ++} // namespace cache ++} // namespace rbd + #endif +-- +2.7.4 + diff --git a/src/ceph/0008-librbd-implement-async-cache-lookup-and-read.patch b/src/ceph/0008-librbd-implement-async-cache-lookup-and-read.patch new file mode 100644 index 0000000..16dd41f --- /dev/null +++ b/src/ceph/0008-librbd-implement-async-cache-lookup-and-read.patch @@ -0,0 +1,366 @@ +From 9f8ff821dfc98dfc3cdb557b736ce455a3ae6162 Mon Sep 17 00:00:00 2001 +From: Yuan Zhou <yuan.zhou@intel.com> +Date: Thu, 16 Aug 2018 17:28:46 +0800 +Subject: [PATCH 08/10] librbd: implement async cache lookup and read + +Signed-off-by: Yuan Zhou <yuan.zhou@intel.com> +--- + .../SharedPersistentObjectCacherObjectDispatch.cc | 63 ++++++++++++---------- + .../SharedPersistentObjectCacherObjectDispatch.h | 7 +++ + src/tools/rbd_cache/CacheController.cc | 9 ++-- + src/tools/rbd_cache/CacheControllerSocket.hpp | 8 ++- + .../rbd_cache/CacheControllerSocketClient.hpp | 49 +++++++++-------- + src/tools/rbd_cache/CacheControllerSocketCommon.h | 12 +++++ + 6 files changed, 94 insertions(+), 54 deletions(-) + +diff --git a/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc b/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc +index 2aa5cad..407ce49 100644 +--- a/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc ++++ b/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc +@@ -29,13 +29,8 @@ SharedPersistentObjectCacherObjectDispatch<I>::SharedPersistentObjectCacherObjec + + template <typename I> + SharedPersistentObjectCacherObjectDispatch<I>::~SharedPersistentObjectCacherObjectDispatch() { +- if (m_object_store) { + delete m_object_store; +- } +- +- if (m_cache_client) { + delete m_cache_client; +- } + } + + template <typename I> +@@ -88,34 +83,48 @@ bool SharedPersistentObjectCacherObjectDispatch<I>::read( + ldout(cct, 20) << "object_no=" << object_no << " " << object_off << "~" + << object_len << dendl; + +- // ensure we aren't holding the cache lock post-read + on_dispatched = util::create_async_context_callback(*m_image_ctx, + on_dispatched); ++ auto ctx = new FunctionContext([this, oid, object_off, object_len, read_data, dispatch_result, on_dispatched](bool cache) { ++ handle_read_cache(cache, oid, object_off, object_len, read_data, dispatch_result, on_dispatched); ++ }); + + if (m_cache_client && m_cache_client->connected && m_object_store) { +- bool exists; + m_cache_client->lookup_object(m_image_ctx->data_ctx.get_pool_name(), +- m_image_ctx->id, oid, &exists); +- +- // try to read from parent image +- ldout(cct, 20) << "SRO cache object exists:" << exists << dendl; +- if (exists) { +- int r = m_object_store->read_object(oid, read_data, object_off, object_len, on_dispatched); +- if (r != 0) { +- *dispatch_result = io::DISPATCH_RESULT_COMPLETE; +- on_dispatched->complete(r); +- return true; +- } +- } ++ m_image_ctx->id, oid, ctx); + } +- +- ldout(cct, 20) << "Continue read from RADOS" << dendl; +- *dispatch_result = io::DISPATCH_RESULT_CONTINUE; +- on_dispatched->complete(0); + return true; + } + + template <typename I> ++int SharedPersistentObjectCacherObjectDispatch<I>::handle_read_cache( ++ bool cache, ++ const std::string &oid, uint64_t object_off, uint64_t object_len, ++ ceph::bufferlist* read_data, io::DispatchResult* dispatch_result, ++ Context* on_dispatched) { ++ // IO chained in reverse order ++ auto cct = m_image_ctx->cct; ++ ldout(cct, 20) << dendl; ++ ++ // try to read from parent image ++ if (cache) { ++ int r = m_object_store->read_object(oid, read_data, object_off, object_len, on_dispatched); ++ if (r != 0) { ++ *dispatch_result = io::DISPATCH_RESULT_COMPLETE; ++ //TODO(): complete in syncfile ++ on_dispatched->complete(r); ++ ldout(cct, 20) << "AAAAcomplete=" << *dispatch_result <<dendl; ++ return true; ++ } ++ } else { ++ *dispatch_result = io::DISPATCH_RESULT_CONTINUE; ++ on_dispatched->complete(0); ++ ldout(cct, 20) << "BBB no cache" << *dispatch_result <<dendl; ++ return false; ++ } ++} ++ ++template <typename I> + void SharedPersistentObjectCacherObjectDispatch<I>::client_handle_request(std::string msg) { + auto cct = m_image_ctx->cct; + ldout(cct, 20) << dendl; +@@ -123,26 +132,26 @@ void SharedPersistentObjectCacherObjectDispatch<I>::client_handle_request(std::s + rbd::cache::rbdsc_req_type_t *io_ctx = (rbd::cache::rbdsc_req_type_t*)(msg.c_str()); + + switch (io_ctx->type) { +- case RBDSC_REGISTER_REPLY: { ++ case rbd::cache::RBDSC_REGISTER_REPLY: { + // open cache handler for volume + ldout(cct, 20) << "SRO cache client open cache handler" << dendl; + m_object_store = new SharedPersistentObjectCacher<I>(m_image_ctx, m_image_ctx->shared_cache_path); + + break; + } +- case RBDSC_READ_REPLY: { ++ case rbd::cache::RBDSC_READ_REPLY: { + ldout(cct, 20) << "SRO cache client start to read cache" << dendl; + //TODO(): should call read here + + break; + } +- case RBDSC_READ_RADOS: { ++ case rbd::cache::RBDSC_READ_RADOS: { + ldout(cct, 20) << "SRO cache client start to read rados" << dendl; + //TODO(): should call read here + + break; + } +- default: ldout(cct, 20) << "nothing" << dendl; ++ default: ldout(cct, 20) << "nothing" << io_ctx->type <<dendl; + break; + + } +diff --git a/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.h b/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.h +index 200688f..36b868a 100644 +--- a/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.h ++++ b/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.h +@@ -112,6 +112,13 @@ public: + + private: + ++ int handle_read_cache( ++ bool cache, ++ const std::string &oid, uint64_t object_off, ++ uint64_t object_len, ceph::bufferlist* read_data, ++ io::DispatchResult* dispatch_result, ++ Context* on_dispatched); ++ + ImageCtxT* m_image_ctx; + + void client_handle_request(std::string msg); +diff --git a/src/tools/rbd_cache/CacheController.cc b/src/tools/rbd_cache/CacheController.cc +index cefcf28..c9d674b 100644 +--- a/src/tools/rbd_cache/CacheController.cc ++++ b/src/tools/rbd_cache/CacheController.cc +@@ -76,7 +76,7 @@ void CacheController::run() { + } + } + +-void CacheController::handle_request(uint64_t sesstion_id, std::string msg){ ++void CacheController::handle_request(uint64_t session_id, std::string msg){ + rbdsc_req_type_t *io_ctx = (rbdsc_req_type_t*)(msg.c_str()); + + int ret = 0; +@@ -86,7 +86,7 @@ void CacheController::handle_request(uint64_t sesstion_id, std::string msg){ + // init cache layout for volume + m_object_cache_store->init_cache(io_ctx->vol_name, io_ctx->vol_size); + io_ctx->type = RBDSC_REGISTER_REPLY; +- m_cache_server->send(sesstion_id, std::string((char*)io_ctx, msg.size())); ++ m_cache_server->send(session_id, std::string((char*)io_ctx, msg.size())); + + break; + } +@@ -98,7 +98,10 @@ void CacheController::handle_request(uint64_t sesstion_id, std::string msg){ + } else { + io_ctx->type = RBDSC_READ_REPLY; + } +- m_cache_server->send(sesstion_id, std::string((char*)io_ctx, msg.size())); ++ if (io_ctx->type != RBDSC_READ_REPLY) { ++ assert(0); ++ } ++ m_cache_server->send(session_id, std::string((char*)io_ctx, msg.size())); + + break; + } +diff --git a/src/tools/rbd_cache/CacheControllerSocket.hpp b/src/tools/rbd_cache/CacheControllerSocket.hpp +index 967af1d..d178b58 100644 +--- a/src/tools/rbd_cache/CacheControllerSocket.hpp ++++ b/src/tools/rbd_cache/CacheControllerSocket.hpp +@@ -43,7 +43,9 @@ public: + void handle_read(const boost::system::error_code& error, size_t bytes_transferred) { + + if (!error) { +- ++ if(bytes_transferred != 544){ ++ assert(0); ++ } + process_msg(session_id, std::string(data_, bytes_transferred)); + + } +@@ -51,7 +53,8 @@ public: + + void handle_write(const boost::system::error_code& error) { + if (!error) { +- socket_.async_read_some(boost::asio::buffer(data_), ++ boost::asio::async_read(socket_, boost::asio::buffer(data_), ++ boost::asio::transfer_exactly(544), + boost::bind(&session::handle_read, + shared_from_this(), + boost::asio::placeholders::error, +@@ -63,6 +66,7 @@ public: + + boost::asio::async_write(socket_, + boost::asio::buffer(msg.c_str(), msg.size()), ++ boost::asio::transfer_exactly(544), + boost::bind(&session::handle_write, + shared_from_this(), + boost::asio::placeholders::error)); +diff --git a/src/tools/rbd_cache/CacheControllerSocketClient.hpp b/src/tools/rbd_cache/CacheControllerSocketClient.hpp +index 56b79ce..3b0ca00 100644 +--- a/src/tools/rbd_cache/CacheControllerSocketClient.hpp ++++ b/src/tools/rbd_cache/CacheControllerSocketClient.hpp +@@ -8,6 +8,7 @@ + #include <boost/bind.hpp> + #include <boost/algorithm/string.hpp> + #include "include/assert.h" ++#include "include/Context.h" + #include "CacheControllerSocketCommon.h" + + +@@ -26,8 +27,12 @@ public: + m_client_process_msg(processmsg), + ep_(stream_protocol::endpoint(file)) + { +- std::thread thd([this](){io_service_.run(); }); +- thd.detach(); ++ io_thread.reset(new std::thread([this](){io_service_.run(); })); ++ } ++ ++ ~CacheClient() { ++ io_service_.stop(); ++ io_thread->join(); + } + + void run(){ +@@ -53,7 +58,8 @@ public: + message->offset = 0; + message->length = 0; + boost::asio::async_write(socket_, boost::asio::buffer((char*)message, message->size()), +- [this](const boost::system::error_code& err, size_t cb) { ++ [this, message](const boost::system::error_code& err, size_t cb) { ++ delete message; + if (!err) { + boost::asio::async_read(socket_, boost::asio::buffer(buffer_), + boost::asio::transfer_exactly(544), +@@ -72,7 +78,7 @@ public: + return 0; + } + +- int lookup_object(std::string pool_name, std::string vol_name, std::string object_id, bool* result) { ++ int lookup_object(std::string pool_name, std::string vol_name, std::string object_id, Context* on_finish) { + rbdsc_req_type_t *message = new rbdsc_req_type_t(); + message->type = RBDSC_READ; + memcpy(message->pool_name, pool_name.c_str(), pool_name.size()); +@@ -82,49 +88,48 @@ public: + message->length = 0; + + boost::asio::async_write(socket_, boost::asio::buffer((char*)message, message->size()), +- [this, result](const boost::system::error_code& err, size_t cb) { ++ [this, on_finish, message](const boost::system::error_code& err, size_t cb) { ++ delete message; + if (!err) { +- get_result(result); ++ get_result(on_finish); + } else { + return -1; + } + }); +- std::unique_lock<std::mutex> lk(m); +- //cv.wait(lk); +- cv.wait_for(lk, std::chrono::milliseconds(100)); ++ + return 0; + } + +- void get_result(bool* result) { ++ void get_result(Context* on_finish) { + boost::asio::async_read(socket_, boost::asio::buffer(buffer_), + boost::asio::transfer_exactly(544), +- [this, result](const boost::system::error_code& err, size_t cb) { ++ [this, on_finish](const boost::system::error_code& err, size_t cb) { ++ if (cb != 544) { ++ assert(0); ++ } + if (!err) { + rbdsc_req_type_t *io_ctx = (rbdsc_req_type_t*)(buffer_); + if (io_ctx->type == RBDSC_READ_REPLY) { +- *result = true; ++ on_finish->complete(true); ++ return; + } else { +- *result = false; ++ on_finish->complete(false); ++ return; + } +- cv.notify_one(); +- m_client_process_msg(std::string(buffer_, cb)); + } else { +- return -1; ++ assert(0); ++ return on_finish->complete(false); + } + }); + } + +- void handle_connect(const boost::system::error_code& error) { +- //TODO(): open librbd snap +- } +- +- void handle_write(const boost::system::error_code& error) { +- } + + private: + boost::asio::io_service& io_service_; + boost::asio::io_service::work io_service_work_; + stream_protocol::socket socket_; ++ ++ std::shared_ptr<std::thread> io_thread; + ClientProcessMsg m_client_process_msg; + stream_protocol::endpoint ep_; + char buffer_[1024]; +diff --git a/src/tools/rbd_cache/CacheControllerSocketCommon.h b/src/tools/rbd_cache/CacheControllerSocketCommon.h +index a9d73a8..e026ec8 100644 +--- a/src/tools/rbd_cache/CacheControllerSocketCommon.h ++++ b/src/tools/rbd_cache/CacheControllerSocketCommon.h +@@ -4,6 +4,7 @@ + #ifndef CACHE_CONTROLLER_SOCKET_COMMON_H + #define CACHE_CONTROLLER_SOCKET_COMMON_H + ++/* + #define RBDSC_REGISTER 0X11 + #define RBDSC_READ 0X12 + #define RBDSC_LOOKUP 0X13 +@@ -11,10 +12,21 @@ + #define RBDSC_READ_REPLY 0X15 + #define RBDSC_LOOKUP_REPLY 0X16 + #define RBDSC_READ_RADOS 0X17 ++*/ + + namespace rbd { + namespace cache { + ++static const int RBDSC_REGISTER = 0X11; ++static const int RBDSC_READ = 0X12; ++static const int RBDSC_LOOKUP = 0X13; ++static const int RBDSC_REGISTER_REPLY = 0X14; ++static const int RBDSC_READ_REPLY = 0X15; ++static const int RBDSC_LOOKUP_REPLY = 0X16; ++static const int RBDSC_READ_RADOS = 0X17; ++ ++ ++ + typedef std::function<void(uint64_t, std::string)> ProcessMsg; + typedef std::function<void(std::string)> ClientProcessMsg; + typedef uint8_t rbdsc_req_type; +-- +2.7.4 + diff --git a/src/ceph/0009-librbd-clean-up-on-rbd-shared-cache.patch b/src/ceph/0009-librbd-clean-up-on-rbd-shared-cache.patch new file mode 100644 index 0000000..4adec93 --- /dev/null +++ b/src/ceph/0009-librbd-clean-up-on-rbd-shared-cache.patch @@ -0,0 +1,767 @@ +From 55b29a71c238ac465d05035a51808a3b616a8f46 Mon Sep 17 00:00:00 2001 +From: Yuan Zhou <yuan.zhou@intel.com> +Date: Wed, 5 Sep 2018 14:40:54 +0800 +Subject: [PATCH 09/10] librbd: clean up on rbd shared cache + +Signed-off-by: Dehao Shang <dehao.shang@intel.com> +Signed-off-by: Yuan Zhou <yuan.zhou@intel.com> +--- + src/common/options.cc | 4 + + .../SharedPersistentObjectCacherObjectDispatch.cc | 28 ++- + .../SharedPersistentObjectCacherObjectDispatch.h | 3 +- + src/tools/rbd_cache/CacheController.cc | 11 +- + src/tools/rbd_cache/CacheController.h | 1 - + src/tools/rbd_cache/CacheControllerSocket.hpp | 213 +++++++++++++------ + .../rbd_cache/CacheControllerSocketClient.hpp | 226 ++++++++++++++------- + src/tools/rbd_cache/CacheControllerSocketCommon.h | 2 + + 8 files changed, 340 insertions(+), 148 deletions(-) + +diff --git a/src/common/options.cc b/src/common/options.cc +index b334c1e..3172744 100644 +--- a/src/common/options.cc ++++ b/src/common/options.cc +@@ -6365,6 +6365,10 @@ static std::vector<Option> get_rbd_options() { + .set_default("/tmp") + .set_description("shared ssd caching data dir"), + ++ Option("rbd_shared_cache_sock", Option::TYPE_STR, Option::LEVEL_ADVANCED) ++ .set_default("/tmp/rbd_shared_ro_cache_sock") ++ .set_description("shared ssd caching domain socket"), ++ + Option("rbd_shared_cache_entries", Option::TYPE_INT, Option::LEVEL_ADVANCED) + .set_default(4096) + .set_description("shared ssd caching data entries"), +diff --git a/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc b/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc +index 407ce49..7cbc019 100644 +--- a/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc ++++ b/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc +@@ -33,6 +33,7 @@ SharedPersistentObjectCacherObjectDispatch<I>::~SharedPersistentObjectCacherObje + delete m_cache_client; + } + ++// TODO if connect fails, init will return error to high layer. + template <typename I> + void SharedPersistentObjectCacherObjectDispatch<I>::init() { + auto cct = m_image_ctx->cct; +@@ -44,11 +45,11 @@ void SharedPersistentObjectCacherObjectDispatch<I>::init() { + return; + } + +- ldout(cct, 20) << "parent image: setup SRO cache client = " << dendl; ++ ldout(cct, 5) << "parent image: setup SRO cache client = " << dendl; + +- std::string controller_path = "/tmp/rbd_shared_readonly_cache_demo"; +- m_cache_client = new rbd::cache::CacheClient(io_service, controller_path.c_str(), +- ([&](std::string s){client_handle_request(s);})); ++ std::string controller_path = ((CephContext*)cct)->_conf.get_val<std::string>("rbd_shared_cache_sock"); ++ m_cache_client = new rbd::cache::CacheClient(controller_path.c_str(), ++ ([&](std::string s){client_handle_request(s);}), m_image_ctx->cct); + + int ret = m_cache_client->connect(); + if (ret < 0) { +@@ -78,18 +79,29 @@ bool SharedPersistentObjectCacherObjectDispatch<I>::read( + io::ExtentMap* extent_map, int* object_dispatch_flags, + io::DispatchResult* dispatch_result, Context** on_finish, + Context* on_dispatched) { ++ + // IO chained in reverse order ++ ++ // Now, policy is : when session have any error, later all read will dispatched to rados layer. ++ if(!m_cache_client->is_session_work()) { ++ *dispatch_result = io::DISPATCH_RESULT_CONTINUE; ++ on_dispatched->complete(0); ++ return true; ++ // TODO : domain socket have error, all read operation will dispatched to rados layer. ++ } ++ + auto cct = m_image_ctx->cct; + ldout(cct, 20) << "object_no=" << object_no << " " << object_off << "~" + << object_len << dendl; + ++ + on_dispatched = util::create_async_context_callback(*m_image_ctx, + on_dispatched); + auto ctx = new FunctionContext([this, oid, object_off, object_len, read_data, dispatch_result, on_dispatched](bool cache) { + handle_read_cache(cache, oid, object_off, object_len, read_data, dispatch_result, on_dispatched); + }); + +- if (m_cache_client && m_cache_client->connected && m_object_store) { ++ if (m_cache_client && m_cache_client->is_session_work() && m_object_store) { + m_cache_client->lookup_object(m_image_ctx->data_ctx.get_pool_name(), + m_image_ctx->id, oid, ctx); + } +@@ -109,6 +121,7 @@ int SharedPersistentObjectCacherObjectDispatch<I>::handle_read_cache( + // try to read from parent image + if (cache) { + int r = m_object_store->read_object(oid, read_data, object_off, object_len, on_dispatched); ++ //int r = object_len; + if (r != 0) { + *dispatch_result = io::DISPATCH_RESULT_COMPLETE; + //TODO(): complete in syncfile +@@ -123,7 +136,6 @@ int SharedPersistentObjectCacherObjectDispatch<I>::handle_read_cache( + return false; + } + } +- + template <typename I> + void SharedPersistentObjectCacherObjectDispatch<I>::client_handle_request(std::string msg) { + auto cct = m_image_ctx->cct; +@@ -133,7 +145,7 @@ void SharedPersistentObjectCacherObjectDispatch<I>::client_handle_request(std::s + + switch (io_ctx->type) { + case rbd::cache::RBDSC_REGISTER_REPLY: { +- // open cache handler for volume ++ // open cache handler for volume + ldout(cct, 20) << "SRO cache client open cache handler" << dendl; + m_object_store = new SharedPersistentObjectCacher<I>(m_image_ctx, m_image_ctx->shared_cache_path); + +@@ -153,7 +165,7 @@ void SharedPersistentObjectCacherObjectDispatch<I>::client_handle_request(std::s + } + default: ldout(cct, 20) << "nothing" << io_ctx->type <<dendl; + break; +- ++ + } + } + +diff --git a/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.h b/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.h +index 36b868a..5685244 100644 +--- a/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.h ++++ b/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.h +@@ -118,12 +118,11 @@ private: + uint64_t object_len, ceph::bufferlist* read_data, + io::DispatchResult* dispatch_result, + Context* on_dispatched); ++ void client_handle_request(std::string msg); + + ImageCtxT* m_image_ctx; + +- void client_handle_request(std::string msg); + rbd::cache::CacheClient *m_cache_client = nullptr; +- boost::asio::io_service io_service; + }; + + } // namespace cache +diff --git a/src/tools/rbd_cache/CacheController.cc b/src/tools/rbd_cache/CacheController.cc +index c9d674b..620192c 100644 +--- a/src/tools/rbd_cache/CacheController.cc ++++ b/src/tools/rbd_cache/CacheController.cc +@@ -65,12 +65,12 @@ void CacheController::handle_signal(int signum){} + void CacheController::run() { + try { + //TODO(): use new socket path +- std::string controller_path = "/tmp/rbd_shared_readonly_cache_demo"; ++ std::string controller_path = m_cct->_conf.get_val<std::string>("rbd_shared_cache_sock"); + std::remove(controller_path.c_str()); + +- m_cache_server = new CacheServer(io_service, controller_path, +- ([&](uint64_t p, std::string s){handle_request(p, s);})); +- io_service.run(); ++ m_cache_server = new CacheServer(controller_path, ++ ([&](uint64_t p, std::string s){handle_request(p, s);}), m_cct); ++ m_cache_server->run(); + } catch (std::exception& e) { + std::cerr << "Exception: " << e.what() << "\n"; + } +@@ -105,7 +105,8 @@ void CacheController::handle_request(uint64_t session_id, std::string msg){ + + break; + } +- ++ std::cout<<"can't recongize request"<<std::endl; ++ assert(0); // TODO replace it. + } + } + +diff --git a/src/tools/rbd_cache/CacheController.h b/src/tools/rbd_cache/CacheController.h +index 0e3abc1..0e23484 100644 +--- a/src/tools/rbd_cache/CacheController.h ++++ b/src/tools/rbd_cache/CacheController.h +@@ -41,7 +41,6 @@ class CacheController { + void handle_request(uint64_t sesstion_id, std::string msg); + + private: +- boost::asio::io_service io_service; + CacheServer *m_cache_server; + std::vector<const char*> m_args; + CephContext *m_cct; +diff --git a/src/tools/rbd_cache/CacheControllerSocket.hpp b/src/tools/rbd_cache/CacheControllerSocket.hpp +index d178b58..2ff7477 100644 +--- a/src/tools/rbd_cache/CacheControllerSocket.hpp ++++ b/src/tools/rbd_cache/CacheControllerSocket.hpp +@@ -11,6 +11,7 @@ + #include <string> + #include <boost/bind.hpp> + #include <boost/asio.hpp> ++#include <boost/asio/error.hpp> + #include <boost/algorithm/string.hpp> + #include "CacheControllerSocketCommon.h" + +@@ -23,110 +24,202 @@ namespace cache { + class session : public std::enable_shared_from_this<session> { + public: + session(uint64_t session_id, boost::asio::io_service& io_service, ProcessMsg processmsg) +- : session_id(session_id), socket_(io_service), process_msg(processmsg) {} ++ : m_session_id(session_id), m_dm_socket(io_service), process_msg(processmsg) {} + + stream_protocol::socket& socket() { +- return socket_; ++ return m_dm_socket; + } + + void start() { +- +- boost::asio::async_read(socket_, boost::asio::buffer(data_), +- boost::asio::transfer_exactly(544), ++ if(true) { ++ serial_handing_request(); ++ } else { ++ parallel_handing_request(); ++ } ++ } ++ // flow: ++ // ++ // recv request --> process request --> reply ack ++ // | | ++ // --------------<------------------------- ++ void serial_handing_request() { ++ boost::asio::async_read(m_dm_socket, boost::asio::buffer(m_buffer, RBDSC_MSG_LEN), ++ boost::asio::transfer_exactly(RBDSC_MSG_LEN), + boost::bind(&session::handle_read, +- shared_from_this(), +- boost::asio::placeholders::error, +- boost::asio::placeholders::bytes_transferred)); ++ shared_from_this(), ++ boost::asio::placeholders::error, ++ boost::asio::placeholders::bytes_transferred)); ++ } + ++ // flow : ++ // ++ // --> thread 1: process request ++ // recv request --> thread 2: process request --> reply ack ++ // --> thread n: process request ++ // ++ void parallel_handing_request() { ++ // TODO + } + ++private: ++ + void handle_read(const boost::system::error_code& error, size_t bytes_transferred) { ++ // when recv eof, the most proble is that client side close socket. ++ // so, server side need to end handing_request ++ if(error == boost::asio::error::eof) { ++ std::cout<<"session: async_read : " << error.message() << std::endl; ++ return; ++ } + +- if (!error) { +- if(bytes_transferred != 544){ +- assert(0); +- } +- process_msg(session_id, std::string(data_, bytes_transferred)); ++ if(error) { ++ std::cout<<"session: async_read fails: " << error.message() << std::endl; ++ assert(0); ++ } + ++ if(bytes_transferred != RBDSC_MSG_LEN) { ++ std::cout<<"session : request in-complete. "<<std::endl; ++ assert(0); + } ++ ++ // TODO async_process can increse coding readable. ++ // process_msg_callback call handle async_send ++ process_msg(m_session_id, std::string(m_buffer, bytes_transferred)); + } + +- void handle_write(const boost::system::error_code& error) { +- if (!error) { +- boost::asio::async_read(socket_, boost::asio::buffer(data_), +- boost::asio::transfer_exactly(544), +- boost::bind(&session::handle_read, +- shared_from_this(), +- boost::asio::placeholders::error, +- boost::asio::placeholders::bytes_transferred)); ++ void handle_write(const boost::system::error_code& error, size_t bytes_transferred) { ++ if (error) { ++ std::cout<<"session: async_write fails: " << error.message() << std::endl; ++ assert(0); + } ++ ++ if(bytes_transferred != RBDSC_MSG_LEN) { ++ std::cout<<"session : reply in-complete. "<<std::endl; ++ assert(0); ++ } ++ ++ boost::asio::async_read(m_dm_socket, boost::asio::buffer(m_buffer), ++ boost::asio::transfer_exactly(RBDSC_MSG_LEN), ++ boost::bind(&session::handle_read, ++ shared_from_this(), ++ boost::asio::placeholders::error, ++ boost::asio::placeholders::bytes_transferred)); ++ + } + ++public: + void send(std::string msg) { +- +- boost::asio::async_write(socket_, ++ boost::asio::async_write(m_dm_socket, + boost::asio::buffer(msg.c_str(), msg.size()), +- boost::asio::transfer_exactly(544), ++ boost::asio::transfer_exactly(RBDSC_MSG_LEN), + boost::bind(&session::handle_write, +- shared_from_this(), +- boost::asio::placeholders::error)); ++ shared_from_this(), ++ boost::asio::placeholders::error, ++ boost::asio::placeholders::bytes_transferred)); + + } + + private: +- uint64_t session_id; +- stream_protocol::socket socket_; ++ uint64_t m_session_id; ++ stream_protocol::socket m_dm_socket; + ProcessMsg process_msg; + + // Buffer used to store data received from the client. + //std::array<char, 1024> data_; +- char data_[1024]; ++ char m_buffer[1024]; + }; + + typedef std::shared_ptr<session> session_ptr; + + class CacheServer { + public: +- CacheServer(boost::asio::io_service& io_service, +- const std::string& file, ProcessMsg processmsg) +- : io_service_(io_service), +- server_process_msg(processmsg), +- acceptor_(io_service, stream_protocol::endpoint(file)) +- { +- session_ptr new_session(new session(session_id, io_service_, server_process_msg)); +- acceptor_.async_accept(new_session->socket(), +- boost::bind(&CacheServer::handle_accept, this, new_session, +- boost::asio::placeholders::error)); +- } +- +- void handle_accept(session_ptr new_session, +- const boost::system::error_code& error) +- { +- //TODO(): open librbd snap +- if (!error) { +- new_session->start(); +- session_map.emplace(session_id, new_session); +- session_id++; +- new_session.reset(new session(session_id, io_service_, server_process_msg)); +- acceptor_.async_accept(new_session->socket(), +- boost::bind(&CacheServer::handle_accept, this, new_session, +- boost::asio::placeholders::error)); ++ CacheServer(const std::string& file, ProcessMsg processmsg, CephContext* cct) ++ : m_cct(cct), m_server_process_msg(processmsg), ++ m_local_path(file), ++ m_acceptor(m_io_service) ++ {} ++ ++ void run() { ++ bool ret; ++ ret = start_accept(); ++ if(!ret) { ++ return; + } ++ m_io_service.run(); + } + ++ // TODO : use callback to replace this function. + void send(uint64_t session_id, std::string msg) { +- auto it = session_map.find(session_id); +- if (it != session_map.end()) { ++ auto it = m_session_map.find(session_id); ++ if (it != m_session_map.end()) { + it->second->send(msg); ++ } else { ++ // TODO : why don't find existing session id ? ++ std::cout<<"don't find session id..."<<std::endl; ++ assert(0); ++ } ++ } ++ ++private: ++ // when creating one acceptor, can control every step in this way. ++ bool start_accept() { ++ boost::system::error_code ec; ++ m_acceptor.open(m_local_path.protocol(), ec); ++ if(ec) { ++ std::cout << "m_acceptor open fails: " << ec.message() << std::endl; ++ return false; ++ } ++ ++ // TODO control acceptor attribute. ++ ++ m_acceptor.bind(m_local_path, ec); ++ if(ec) { ++ std::cout << "m_acceptor bind fails: " << ec.message() << std::endl; ++ return false; ++ } ++ ++ m_acceptor.listen(boost::asio::socket_base::max_connections, ec); ++ if(ec) { ++ std::cout << "m_acceptor listen fails: " << ec.message() << std::endl; ++ return false; + } ++ ++ accept(); ++ return true; ++ } ++ ++ void accept() { ++ session_ptr new_session(new session(m_session_id, m_io_service, m_server_process_msg)); ++ m_acceptor.async_accept(new_session->socket(), ++ boost::bind(&CacheServer::handle_accept, this, new_session, ++ boost::asio::placeholders::error)); ++ } ++ ++ void handle_accept(session_ptr new_session, const boost::system::error_code& error) { ++ //TODO(): open librbd snap ... yuan ++ ++ if(error) { ++ std::cout << "async accept fails : " << error.message() << std::endl; ++ assert(0); // TODO ++ } ++ ++ // must put session into m_session_map at the front of session.start() ++ m_session_map.emplace(m_session_id, new_session); ++ // TODO : session setting ++ new_session->start(); ++ m_session_id++; ++ ++ // lanuch next accept ++ accept(); + } + + private: +- boost::asio::io_service& io_service_; +- ProcessMsg server_process_msg; +- stream_protocol::acceptor acceptor_; +- uint64_t session_id = 1; +- std::map<uint64_t, session_ptr> session_map; ++ CephContext* m_cct; ++ boost::asio::io_service m_io_service; // TODO wrapper it. ++ ProcessMsg m_server_process_msg; ++ stream_protocol::endpoint m_local_path; ++ stream_protocol::acceptor m_acceptor; ++ uint64_t m_session_id = 1; ++ std::map<uint64_t, session_ptr> m_session_map; + }; + + } // namespace cache +diff --git a/src/tools/rbd_cache/CacheControllerSocketClient.hpp b/src/tools/rbd_cache/CacheControllerSocketClient.hpp +index 3b0ca00..964f888 100644 +--- a/src/tools/rbd_cache/CacheControllerSocketClient.hpp ++++ b/src/tools/rbd_cache/CacheControllerSocketClient.hpp +@@ -4,9 +4,12 @@ + #ifndef CACHE_CONTROLLER_SOCKET_CLIENT_H + #define CACHE_CONTROLLER_SOCKET_CLIENT_H + ++#include <atomic> + #include <boost/asio.hpp> + #include <boost/bind.hpp> ++#include <boost/asio/error.hpp> + #include <boost/algorithm/string.hpp> ++#include "librbd/ImageCtx.h" + #include "include/assert.h" + #include "include/Context.h" + #include "CacheControllerSocketCommon.h" +@@ -19,32 +22,64 @@ namespace cache { + + class CacheClient { + public: +- CacheClient(boost::asio::io_service& io_service, +- const std::string& file, ClientProcessMsg processmsg) +- : io_service_(io_service), +- io_service_work_(io_service), +- socket_(io_service), ++ CacheClient(const std::string& file, ClientProcessMsg processmsg, CephContext* ceph_ctx) ++ : m_io_service_work(m_io_service), ++ m_dm_socket(m_io_service), + m_client_process_msg(processmsg), +- ep_(stream_protocol::endpoint(file)) ++ m_ep(stream_protocol::endpoint(file)), ++ m_session_work(false), ++ cct(ceph_ctx) + { +- io_thread.reset(new std::thread([this](){io_service_.run(); })); ++ // TODO wrapper io_service ++ std::thread thd([this](){ ++ m_io_service.run();}); ++ thd.detach(); + } + +- ~CacheClient() { +- io_service_.stop(); +- io_thread->join(); ++ void run(){ + } + +- void run(){ +- } ++ bool is_session_work() { ++ return m_session_work.load() == true; ++ } ++ ++ // just when error occur, call this method. ++ void close() { ++ m_session_work.store(false); ++ boost::system::error_code close_ec; ++ m_dm_socket.close(close_ec); ++ if(close_ec) { ++ std::cout << "close: " << close_ec.message() << std::endl; ++ } ++ std::cout << "session don't work, later all request will be dispatched to rados layer" << std::endl; ++ } + + int connect() { +- try { +- socket_.connect(ep_); +- } catch (std::exception& e) { ++ boost::system::error_code ec; ++ m_dm_socket.connect(m_ep, ec); ++ if(ec) { ++ if(ec == boost::asio::error::connection_refused) { ++ std::cout << ec.message() << " : maybe rbd-cache Controller don't startup. " ++ << "Now data will be read from ceph cluster " << std::endl; ++ } else { ++ std::cout << "connect: " << ec.message() << std::endl; ++ } ++ ++ if(m_dm_socket.is_open()) { ++ // Set to indicate what error occurred, if any. ++ // Note that, even if the function indicates an error, ++ // the underlying descriptor is closed. ++ boost::system::error_code close_ec; ++ m_dm_socket.close(close_ec); ++ if(close_ec) { ++ std::cout << "close: " << close_ec.message() << std::endl; ++ } ++ } + return -1; + } +- connected = true; ++ ++ std::cout<<"connect success"<<std::endl; ++ + return 0; + } + +@@ -57,27 +92,51 @@ public: + message->vol_size = vol_size; + message->offset = 0; + message->length = 0; +- boost::asio::async_write(socket_, boost::asio::buffer((char*)message, message->size()), +- [this, message](const boost::system::error_code& err, size_t cb) { +- delete message; +- if (!err) { +- boost::asio::async_read(socket_, boost::asio::buffer(buffer_), +- boost::asio::transfer_exactly(544), +- [this](const boost::system::error_code& err, size_t cb) { +- if (!err) { +- m_client_process_msg(std::string(buffer_, cb)); +- } else { +- return -1; +- } +- }); +- } else { +- return -1; +- } +- }); ++ ++ uint64_t ret; ++ boost::system::error_code ec; ++ ++ ret = boost::asio::write(m_dm_socket, boost::asio::buffer((char*)message, message->size()), ec); ++ if(ec) { ++ std::cout << "write fails : " << ec.message() << std::endl; ++ return -1; ++ } ++ ++ if(ret != message->size()) { ++ std::cout << "write fails : ret != send_bytes "<< std::endl; ++ return -1; ++ } ++ ++ // hard code TODO ++ ret = boost::asio::read(m_dm_socket, boost::asio::buffer(m_recv_buffer, RBDSC_MSG_LEN), ec); ++ if(ec == boost::asio::error::eof) { ++ std::cout<< "recv eof"<<std::endl; ++ return -1; ++ } ++ ++ if(ec) { ++ std::cout << "write fails : " << ec.message() << std::endl; ++ return -1; ++ } ++ ++ if(ret != RBDSC_MSG_LEN) { ++ std::cout << "write fails : ret != receive bytes " << std::endl; ++ return -1; ++ } ++ ++ m_client_process_msg(std::string(m_recv_buffer, ret)); ++ ++ delete message; ++ ++ std::cout << "register volume success" << std::endl; ++ ++ // TODO ++ m_session_work.store(true); + + return 0; + } + ++ // if occur any error, we just return false. Then read from rados. + int lookup_object(std::string pool_name, std::string vol_name, std::string object_id, Context* on_finish) { + rbdsc_req_type_t *message = new rbdsc_req_type_t(); + message->type = RBDSC_READ; +@@ -87,59 +146,82 @@ public: + message->offset = 0; + message->length = 0; + +- boost::asio::async_write(socket_, boost::asio::buffer((char*)message, message->size()), ++ boost::asio::async_write(m_dm_socket, ++ boost::asio::buffer((char*)message, message->size()), ++ boost::asio::transfer_exactly(RBDSC_MSG_LEN), + [this, on_finish, message](const boost::system::error_code& err, size_t cb) { +- delete message; +- if (!err) { ++ delete message; ++ if(err) { ++ std::cout<< "lookup_object: async_write fails." << err.message() << std::endl; ++ close(); ++ on_finish->complete(false); ++ return; ++ } ++ if(cb != RBDSC_MSG_LEN) { ++ std::cout<< "lookup_object: async_write fails. in-complete request" <<std::endl; ++ close(); ++ on_finish->complete(false); ++ return; ++ } + get_result(on_finish); +- } else { +- return -1; +- } + }); + + return 0; + } + + void get_result(Context* on_finish) { +- boost::asio::async_read(socket_, boost::asio::buffer(buffer_), +- boost::asio::transfer_exactly(544), ++ boost::asio::async_read(m_dm_socket, boost::asio::buffer(m_recv_buffer, RBDSC_MSG_LEN), ++ boost::asio::transfer_exactly(RBDSC_MSG_LEN), + [this, on_finish](const boost::system::error_code& err, size_t cb) { +- if (cb != 544) { +- assert(0); +- } +- if (!err) { +- rbdsc_req_type_t *io_ctx = (rbdsc_req_type_t*)(buffer_); +- if (io_ctx->type == RBDSC_READ_REPLY) { +- on_finish->complete(true); +- return; +- } else { +- on_finish->complete(false); +- return; +- } +- } else { +- assert(0); +- return on_finish->complete(false); +- } ++ if(err == boost::asio::error::eof) { ++ std::cout<<"get_result: ack is EOF." << std::endl; ++ close(); ++ on_finish->complete(false); ++ return; ++ } ++ if(err) { ++ std::cout<< "get_result: async_read fails:" << err.message() << std::endl; ++ close(); ++ on_finish->complete(false); // TODO replace this assert with some metohds. ++ return; ++ } ++ if (cb != RBDSC_MSG_LEN) { ++ close(); ++ std::cout << "get_result: in-complete ack." << std::endl; ++ on_finish->complete(false); // TODO: replace this assert with some methods. ++ } ++ ++ rbdsc_req_type_t *io_ctx = (rbdsc_req_type_t*)(m_recv_buffer); ++ ++ // TODO: re-occur yuan's bug ++ if(io_ctx->type == RBDSC_READ) { ++ std::cout << "get rbdsc_read... " << std::endl; ++ assert(0); ++ } ++ ++ if (io_ctx->type == RBDSC_READ_REPLY) { ++ on_finish->complete(true); ++ return; ++ } else { ++ on_finish->complete(false); ++ return; ++ } + }); + } + +- + private: +- boost::asio::io_service& io_service_; +- boost::asio::io_service::work io_service_work_; +- stream_protocol::socket socket_; +- +- std::shared_ptr<std::thread> io_thread; ++ boost::asio::io_service m_io_service; ++ boost::asio::io_service::work m_io_service_work; ++ stream_protocol::socket m_dm_socket; + ClientProcessMsg m_client_process_msg; +- stream_protocol::endpoint ep_; +- char buffer_[1024]; +- int block_size_ = 1024; +- +- std::condition_variable cv; +- std::mutex m; +- +-public: +- bool connected = false; ++ stream_protocol::endpoint m_ep; ++ char m_recv_buffer[1024]; ++ ++ // atomic modfiy for this variable. ++ // thread 1 : asio callback thread modify it. ++ // thread 2 : librbd read it. ++ std::atomic<bool> m_session_work; ++ CephContext* cct; + }; + + } // namespace cache +diff --git a/src/tools/rbd_cache/CacheControllerSocketCommon.h b/src/tools/rbd_cache/CacheControllerSocketCommon.h +index e026ec8..e17529a 100644 +--- a/src/tools/rbd_cache/CacheControllerSocketCommon.h ++++ b/src/tools/rbd_cache/CacheControllerSocketCommon.h +@@ -55,6 +55,8 @@ struct rbdsc_req_type_t { + } + }; + ++static const int RBDSC_MSG_LEN = sizeof(rbdsc_req_type_t); ++ + } // namespace cache + } // namespace rbd + #endif +-- +2.7.4 + diff --git a/src/ceph/0010-librbd-new-namespace-ceph-immutable-obj-cache.patch b/src/ceph/0010-librbd-new-namespace-ceph-immutable-obj-cache.patch new file mode 100644 index 0000000..46040e6 --- /dev/null +++ b/src/ceph/0010-librbd-new-namespace-ceph-immutable-obj-cache.patch @@ -0,0 +1,3510 @@ +From 9a097084d06e7186206b43d9c81d1f648791d7a4 Mon Sep 17 00:00:00 2001 +From: Yuan Zhou <yuan.zhou@intel.com> +Date: Fri, 7 Sep 2018 08:29:51 +0800 +Subject: [PATCH 10/10] librbd: new namespace ceph immutable obj cache + +clean up class/func names to use the new namespace + +Signed-off-by: Yuan Zhou <yuan.zhou@intel.com> +--- + src/common/options.cc | 2 +- + src/common/subsys.h | 3 +- + src/librbd/CMakeLists.txt | 3 +- + .../SharedPersistentObjectCacherObjectDispatch.cc | 175 ---------------- + .../SharedPersistentObjectCacherObjectDispatch.h | 133 ------------ + src/librbd/cache/SharedReadOnlyObjectDispatch.cc | 170 +++++++++++++++ + src/librbd/cache/SharedReadOnlyObjectDispatch.h | 126 ++++++++++++ + src/librbd/image/OpenRequest.cc | 4 +- + src/tools/CMakeLists.txt | 2 +- + .../ceph_immutable_object_cache/CMakeLists.txt | 11 + + .../ceph_immutable_object_cache/CacheClient.cc | 205 ++++++++++++++++++ + .../ceph_immutable_object_cache/CacheClient.h | 53 +++++ + .../ceph_immutable_object_cache/CacheController.cc | 117 +++++++++++ + .../ceph_immutable_object_cache/CacheController.h | 53 +++++ + .../ceph_immutable_object_cache/CacheServer.cc | 99 +++++++++ + .../ceph_immutable_object_cache/CacheServer.h | 54 +++++ + .../ceph_immutable_object_cache/CacheSession.cc | 115 +++++++++++ + .../ceph_immutable_object_cache/CacheSession.h | 58 ++++++ + .../ObjectCacheStore.cc | 172 ++++++++++++++++ + .../ceph_immutable_object_cache/ObjectCacheStore.h | 70 +++++++ + src/tools/ceph_immutable_object_cache/Policy.hpp | 33 +++ + .../ceph_immutable_object_cache/SimplePolicy.hpp | 163 +++++++++++++++ + .../ceph_immutable_object_cache/SocketCommon.h | 54 +++++ + src/tools/ceph_immutable_object_cache/main.cc | 85 ++++++++ + src/tools/rbd_cache/CMakeLists.txt | 9 - + src/tools/rbd_cache/CacheController.cc | 116 ----------- + src/tools/rbd_cache/CacheController.h | 54 ----- + src/tools/rbd_cache/CacheControllerSocket.hpp | 228 -------------------- + .../rbd_cache/CacheControllerSocketClient.hpp | 229 --------------------- + src/tools/rbd_cache/CacheControllerSocketCommon.h | 62 ------ + src/tools/rbd_cache/ObjectCacheStore.cc | 172 ---------------- + src/tools/rbd_cache/ObjectCacheStore.h | 70 ------- + src/tools/rbd_cache/Policy.hpp | 30 --- + src/tools/rbd_cache/SimplePolicy.hpp | 160 -------------- + src/tools/rbd_cache/main.cc | 85 -------- + 35 files changed, 1646 insertions(+), 1529 deletions(-) + delete mode 100644 src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc + delete mode 100644 src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.h + create mode 100644 src/librbd/cache/SharedReadOnlyObjectDispatch.cc + create mode 100644 src/librbd/cache/SharedReadOnlyObjectDispatch.h + create mode 100644 src/tools/ceph_immutable_object_cache/CMakeLists.txt + create mode 100644 src/tools/ceph_immutable_object_cache/CacheClient.cc + create mode 100644 src/tools/ceph_immutable_object_cache/CacheClient.h + create mode 100644 src/tools/ceph_immutable_object_cache/CacheController.cc + create mode 100644 src/tools/ceph_immutable_object_cache/CacheController.h + create mode 100644 src/tools/ceph_immutable_object_cache/CacheServer.cc + create mode 100644 src/tools/ceph_immutable_object_cache/CacheServer.h + create mode 100644 src/tools/ceph_immutable_object_cache/CacheSession.cc + create mode 100644 src/tools/ceph_immutable_object_cache/CacheSession.h + create mode 100644 src/tools/ceph_immutable_object_cache/ObjectCacheStore.cc + create mode 100644 src/tools/ceph_immutable_object_cache/ObjectCacheStore.h + create mode 100644 src/tools/ceph_immutable_object_cache/Policy.hpp + create mode 100644 src/tools/ceph_immutable_object_cache/SimplePolicy.hpp + create mode 100644 src/tools/ceph_immutable_object_cache/SocketCommon.h + create mode 100644 src/tools/ceph_immutable_object_cache/main.cc + delete mode 100644 src/tools/rbd_cache/CMakeLists.txt + delete mode 100644 src/tools/rbd_cache/CacheController.cc + delete mode 100644 src/tools/rbd_cache/CacheController.h + delete mode 100644 src/tools/rbd_cache/CacheControllerSocket.hpp + delete mode 100644 src/tools/rbd_cache/CacheControllerSocketClient.hpp + delete mode 100644 src/tools/rbd_cache/CacheControllerSocketCommon.h + delete mode 100644 src/tools/rbd_cache/ObjectCacheStore.cc + delete mode 100644 src/tools/rbd_cache/ObjectCacheStore.h + delete mode 100644 src/tools/rbd_cache/Policy.hpp + delete mode 100644 src/tools/rbd_cache/SimplePolicy.hpp + delete mode 100644 src/tools/rbd_cache/main.cc + +diff --git a/src/common/options.cc b/src/common/options.cc +index 3172744..bf00aab 100644 +--- a/src/common/options.cc ++++ b/src/common/options.cc +@@ -6358,7 +6358,7 @@ static std::vector<Option> get_rbd_options() { + .set_description("time in seconds for detecting a hung thread"), + + Option("rbd_shared_cache_enabled", Option::TYPE_BOOL, Option::LEVEL_ADVANCED) +- .set_default(true) ++ .set_default(false) + .set_description("whether to enable shared ssd caching"), + + Option("rbd_shared_cache_path", Option::TYPE_STR, Option::LEVEL_ADVANCED) +diff --git a/src/common/subsys.h b/src/common/subsys.h +index bdd2d0e..5b532c1 100644 +--- a/src/common/subsys.h ++++ b/src/common/subsys.h +@@ -36,9 +36,10 @@ SUBSYS(objecter, 0, 1) + SUBSYS(rados, 0, 5) + SUBSYS(rbd, 0, 5) + SUBSYS(rbd_mirror, 0, 5) +-SUBSYS(rbd_replay, 0, 5) + SUBSYS(journaler, 0, 5) + SUBSYS(objectcacher, 0, 5) ++SUBSYS(immutable_obj_cache, 0, 5) ++SUBSYS(rbd_replay, 0, 5) + SUBSYS(client, 0, 5) + SUBSYS(osd, 1, 5) + SUBSYS(optracker, 0, 5) +diff --git a/src/librbd/CMakeLists.txt b/src/librbd/CMakeLists.txt +index 540ee78..c9bfb6f 100644 +--- a/src/librbd/CMakeLists.txt ++++ b/src/librbd/CMakeLists.txt +@@ -32,7 +32,7 @@ set(librbd_internal_srcs + api/Snapshot.cc + cache/ImageWriteback.cc + cache/ObjectCacherObjectDispatch.cc +- cache/SharedPersistentObjectCacherObjectDispatch.cc ++ cache/SharedReadOnlyObjectDispatch.cc + cache/SharedPersistentObjectCacher.cc + cache/SharedPersistentObjectCacherFile.cc + deep_copy/ImageCopyRequest.cc +@@ -125,6 +125,7 @@ set(librbd_internal_srcs + trash/MoveRequest.cc + watcher/Notifier.cc + watcher/RewatchRequest.cc ++ ${CMAKE_SOURCE_DIR}/src/tools/ceph_immutable_object_cache/CacheClient.cc + ${CMAKE_SOURCE_DIR}/src/common/ContextCompletion.cc) + + add_library(rbd_api STATIC librbd.cc) +diff --git a/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc b/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc +deleted file mode 100644 +index 7cbc019..0000000 +--- a/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc ++++ /dev/null +@@ -1,175 +0,0 @@ +-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +-// vim: ts=8 sw=2 smarttab +- +-#include "librbd/cache/SharedPersistentObjectCacherObjectDispatch.h" +-#include "common/WorkQueue.h" +-#include "librbd/ImageCtx.h" +-#include "librbd/Journal.h" +-#include "librbd/Utils.h" +-#include "librbd/LibrbdWriteback.h" +-#include "librbd/io/ObjectDispatchSpec.h" +-#include "librbd/io/ObjectDispatcher.h" +-#include "librbd/io/Utils.h" +-#include "osd/osd_types.h" +-#include "osdc/WritebackHandler.h" +-#include <vector> +- +-#define dout_subsys ceph_subsys_rbd +-#undef dout_prefix +-#define dout_prefix *_dout << "librbd::cache::SharedPersistentObjectCacherObjectDispatch: " \ +- << this << " " << __func__ << ": " +- +-namespace librbd { +-namespace cache { +- +-template <typename I> +-SharedPersistentObjectCacherObjectDispatch<I>::SharedPersistentObjectCacherObjectDispatch( +- I* image_ctx) : m_image_ctx(image_ctx) { +-} +- +-template <typename I> +-SharedPersistentObjectCacherObjectDispatch<I>::~SharedPersistentObjectCacherObjectDispatch() { +- delete m_object_store; +- delete m_cache_client; +-} +- +-// TODO if connect fails, init will return error to high layer. +-template <typename I> +-void SharedPersistentObjectCacherObjectDispatch<I>::init() { +- auto cct = m_image_ctx->cct; +- ldout(cct, 5) << dendl; +- +- if (m_image_ctx->parent != nullptr) { +- //TODO(): should we cover multi-leveled clone? +- ldout(cct, 5) << "child image: skipping SRO cache client" << dendl; +- return; +- } +- +- ldout(cct, 5) << "parent image: setup SRO cache client = " << dendl; +- +- std::string controller_path = ((CephContext*)cct)->_conf.get_val<std::string>("rbd_shared_cache_sock"); +- m_cache_client = new rbd::cache::CacheClient(controller_path.c_str(), +- ([&](std::string s){client_handle_request(s);}), m_image_ctx->cct); +- +- int ret = m_cache_client->connect(); +- if (ret < 0) { +- ldout(cct, 5) << "SRO cache client fail to connect with local controller: " +- << "please start rbd-cache daemon" +- << dendl; +- } else { +- ldout(cct, 5) << "SRO cache client to register volume on rbd-cache daemon: " +- << "name = " << m_image_ctx->id +- << dendl; +- +- ret = m_cache_client->register_volume(m_image_ctx->data_ctx.get_pool_name(), +- m_image_ctx->id, m_image_ctx->size); +- +- if (ret >= 0) { +- // add ourself to the IO object dispatcher chain +- m_image_ctx->io_object_dispatcher->register_object_dispatch(this); +- } +- } +-} +- +-template <typename I> +-bool SharedPersistentObjectCacherObjectDispatch<I>::read( +- const std::string &oid, uint64_t object_no, uint64_t object_off, +- uint64_t object_len, librados::snap_t snap_id, int op_flags, +- const ZTracer::Trace &parent_trace, ceph::bufferlist* read_data, +- io::ExtentMap* extent_map, int* object_dispatch_flags, +- io::DispatchResult* dispatch_result, Context** on_finish, +- Context* on_dispatched) { +- +- // IO chained in reverse order +- +- // Now, policy is : when session have any error, later all read will dispatched to rados layer. +- if(!m_cache_client->is_session_work()) { +- *dispatch_result = io::DISPATCH_RESULT_CONTINUE; +- on_dispatched->complete(0); +- return true; +- // TODO : domain socket have error, all read operation will dispatched to rados layer. +- } +- +- auto cct = m_image_ctx->cct; +- ldout(cct, 20) << "object_no=" << object_no << " " << object_off << "~" +- << object_len << dendl; +- +- +- on_dispatched = util::create_async_context_callback(*m_image_ctx, +- on_dispatched); +- auto ctx = new FunctionContext([this, oid, object_off, object_len, read_data, dispatch_result, on_dispatched](bool cache) { +- handle_read_cache(cache, oid, object_off, object_len, read_data, dispatch_result, on_dispatched); +- }); +- +- if (m_cache_client && m_cache_client->is_session_work() && m_object_store) { +- m_cache_client->lookup_object(m_image_ctx->data_ctx.get_pool_name(), +- m_image_ctx->id, oid, ctx); +- } +- return true; +-} +- +-template <typename I> +-int SharedPersistentObjectCacherObjectDispatch<I>::handle_read_cache( +- bool cache, +- const std::string &oid, uint64_t object_off, uint64_t object_len, +- ceph::bufferlist* read_data, io::DispatchResult* dispatch_result, +- Context* on_dispatched) { +- // IO chained in reverse order +- auto cct = m_image_ctx->cct; +- ldout(cct, 20) << dendl; +- +- // try to read from parent image +- if (cache) { +- int r = m_object_store->read_object(oid, read_data, object_off, object_len, on_dispatched); +- //int r = object_len; +- if (r != 0) { +- *dispatch_result = io::DISPATCH_RESULT_COMPLETE; +- //TODO(): complete in syncfile +- on_dispatched->complete(r); +- ldout(cct, 20) << "AAAAcomplete=" << *dispatch_result <<dendl; +- return true; +- } +- } else { +- *dispatch_result = io::DISPATCH_RESULT_CONTINUE; +- on_dispatched->complete(0); +- ldout(cct, 20) << "BBB no cache" << *dispatch_result <<dendl; +- return false; +- } +-} +-template <typename I> +-void SharedPersistentObjectCacherObjectDispatch<I>::client_handle_request(std::string msg) { +- auto cct = m_image_ctx->cct; +- ldout(cct, 20) << dendl; +- +- rbd::cache::rbdsc_req_type_t *io_ctx = (rbd::cache::rbdsc_req_type_t*)(msg.c_str()); +- +- switch (io_ctx->type) { +- case rbd::cache::RBDSC_REGISTER_REPLY: { +- // open cache handler for volume +- ldout(cct, 20) << "SRO cache client open cache handler" << dendl; +- m_object_store = new SharedPersistentObjectCacher<I>(m_image_ctx, m_image_ctx->shared_cache_path); +- +- break; +- } +- case rbd::cache::RBDSC_READ_REPLY: { +- ldout(cct, 20) << "SRO cache client start to read cache" << dendl; +- //TODO(): should call read here +- +- break; +- } +- case rbd::cache::RBDSC_READ_RADOS: { +- ldout(cct, 20) << "SRO cache client start to read rados" << dendl; +- //TODO(): should call read here +- +- break; +- } +- default: ldout(cct, 20) << "nothing" << io_ctx->type <<dendl; +- break; +- +- } +-} +- +-} // namespace cache +-} // namespace librbd +- +-template class librbd::cache::SharedPersistentObjectCacherObjectDispatch<librbd::ImageCtx>; +diff --git a/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.h b/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.h +deleted file mode 100644 +index 5685244..0000000 +--- a/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.h ++++ /dev/null +@@ -1,133 +0,0 @@ +-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +-// vim: ts=8 sw=2 smarttab +- +-#ifndef CEPH_LIBRBD_CACHE_SHARED_PERSISTENT_OBJECT_CACHER_OBJECT_DISPATCH_H +-#define CEPH_LIBRBD_CACHE_SHARED_PERSISTENT_OBJECT_CACHER_OBJECT_DISPATCH_H +- +-#include "librbd/io/ObjectDispatchInterface.h" +-#include "common/Mutex.h" +-#include "osdc/ObjectCacher.h" +-#include "tools/rbd_cache/CacheControllerSocketClient.hpp" +-#include "SharedPersistentObjectCacher.h" +- +-struct WritebackHandler; +- +-namespace librbd { +- +-class ImageCtx; +- +-namespace cache { +- +-/** +- * Facade around the OSDC object cacher to make it align with +- * the object dispatcher interface +- */ +-template <typename ImageCtxT = ImageCtx> +-class SharedPersistentObjectCacherObjectDispatch : public io::ObjectDispatchInterface { +-public: +- static SharedPersistentObjectCacherObjectDispatch* create(ImageCtxT* image_ctx) { +- return new SharedPersistentObjectCacherObjectDispatch(image_ctx); +- } +- +- SharedPersistentObjectCacherObjectDispatch(ImageCtxT* image_ctx); +- ~SharedPersistentObjectCacherObjectDispatch() override; +- +- io::ObjectDispatchLayer get_object_dispatch_layer() const override { +- return io::OBJECT_DISPATCH_LAYER_SHARED_PERSISTENT_CACHE; +- } +- +- void init(); +- void shut_down(Context* on_finish) { +- m_image_ctx->op_work_queue->queue(on_finish, 0); +- } +- +- bool read( +- const std::string &oid, uint64_t object_no, uint64_t object_off, +- uint64_t object_len, librados::snap_t snap_id, int op_flags, +- const ZTracer::Trace &parent_trace, ceph::bufferlist* read_data, +- io::ExtentMap* extent_map, int* object_dispatch_flags, +- io::DispatchResult* dispatch_result, Context** on_finish, +- Context* on_dispatched) override; +- +- bool discard( +- const std::string &oid, uint64_t object_no, uint64_t object_off, +- uint64_t object_len, const ::SnapContext &snapc, int discard_flags, +- const ZTracer::Trace &parent_trace, int* object_dispatch_flags, +- uint64_t* journal_tid, io::DispatchResult* dispatch_result, +- Context** on_finish, Context* on_dispatched) { +- return false; +- } +- +- bool write( +- const std::string &oid, uint64_t object_no, uint64_t object_off, +- ceph::bufferlist&& data, const ::SnapContext &snapc, int op_flags, +- const ZTracer::Trace &parent_trace, int* object_dispatch_flags, +- uint64_t* journal_tid, io::DispatchResult* dispatch_result, +- Context** on_finish, Context* on_dispatched) { +- return false; +- } +- +- bool write_same( +- const std::string &oid, uint64_t object_no, uint64_t object_off, +- uint64_t object_len, io::Extents&& buffer_extents, +- ceph::bufferlist&& data, const ::SnapContext &snapc, int op_flags, +- const ZTracer::Trace &parent_trace, int* object_dispatch_flags, +- uint64_t* journal_tid, io::DispatchResult* dispatch_result, +- Context** on_finish, Context* on_dispatched) { +- return false; +- } +- +- bool compare_and_write( +- const std::string &oid, uint64_t object_no, uint64_t object_off, +- ceph::bufferlist&& cmp_data, ceph::bufferlist&& write_data, +- const ::SnapContext &snapc, int op_flags, +- const ZTracer::Trace &parent_trace, uint64_t* mismatch_offset, +- int* object_dispatch_flags, uint64_t* journal_tid, +- io::DispatchResult* dispatch_result, Context** on_finish, +- Context* on_dispatched) { +- return false; +- } +- +- bool flush( +- io::FlushSource flush_source, const ZTracer::Trace &parent_trace, +- io::DispatchResult* dispatch_result, Context** on_finish, +- Context* on_dispatched) { +- return false; +- } +- +- bool invalidate_cache(Context* on_finish) { +- return false; +- } +- +- bool reset_existence_cache(Context* on_finish) { +- return false; +- } +- +- void extent_overwritten( +- uint64_t object_no, uint64_t object_off, uint64_t object_len, +- uint64_t journal_tid, uint64_t new_journal_tid) { +- } +- +- SharedPersistentObjectCacher<ImageCtxT> *m_object_store = nullptr; +- +-private: +- +- int handle_read_cache( +- bool cache, +- const std::string &oid, uint64_t object_off, +- uint64_t object_len, ceph::bufferlist* read_data, +- io::DispatchResult* dispatch_result, +- Context* on_dispatched); +- void client_handle_request(std::string msg); +- +- ImageCtxT* m_image_ctx; +- +- rbd::cache::CacheClient *m_cache_client = nullptr; +-}; +- +-} // namespace cache +-} // namespace librbd +- +-extern template class librbd::cache::SharedPersistentObjectCacherObjectDispatch<librbd::ImageCtx>; +- +-#endif // CEPH_LIBRBD_CACHE_OBJECT_CACHER_OBJECT_DISPATCH_H +diff --git a/src/librbd/cache/SharedReadOnlyObjectDispatch.cc b/src/librbd/cache/SharedReadOnlyObjectDispatch.cc +new file mode 100644 +index 0000000..23c7dbe +--- /dev/null ++++ b/src/librbd/cache/SharedReadOnlyObjectDispatch.cc +@@ -0,0 +1,170 @@ ++// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- ++// vim: ts=8 sw=2 smarttab ++ ++#include "common/WorkQueue.h" ++#include "librbd/ImageCtx.h" ++#include "librbd/Journal.h" ++#include "librbd/Utils.h" ++#include "librbd/LibrbdWriteback.h" ++#include "librbd/io/ObjectDispatchSpec.h" ++#include "librbd/io/ObjectDispatcher.h" ++#include "librbd/io/Utils.h" ++#include "librbd/cache/SharedReadOnlyObjectDispatch.h" ++#include "osd/osd_types.h" ++#include "osdc/WritebackHandler.h" ++ ++#include <vector> ++ ++#define dout_subsys ceph_subsys_rbd ++#undef dout_prefix ++#define dout_prefix *_dout << "librbd::cache::SharedReadOnlyObjectDispatch: " \ ++ << this << " " << __func__ << ": " ++ ++namespace librbd { ++namespace cache { ++ ++template <typename I> ++SharedReadOnlyObjectDispatch<I>::SharedReadOnlyObjectDispatch( ++ I* image_ctx) : m_image_ctx(image_ctx) { ++} ++ ++template <typename I> ++SharedReadOnlyObjectDispatch<I>::~SharedReadOnlyObjectDispatch() { ++ delete m_object_store; ++ delete m_cache_client; ++} ++ ++// TODO if connect fails, init will return error to high layer. ++template <typename I> ++void SharedReadOnlyObjectDispatch<I>::init() { ++ auto cct = m_image_ctx->cct; ++ ldout(cct, 5) << dendl; ++ ++ if (m_image_ctx->parent != nullptr) { ++ //TODO(): should we cover multi-leveled clone? ++ ldout(cct, 5) << "child image: skipping SRO cache client" << dendl; ++ return; ++ } ++ ++ ldout(cct, 5) << "parent image: setup SRO cache client = " << dendl; ++ ++ std::string controller_path = ((CephContext*)cct)->_conf.get_val<std::string>("rbd_shared_cache_sock"); ++ m_cache_client = new ceph::immutable_obj_cache::CacheClient(controller_path.c_str(), ++ ([&](std::string s){client_handle_request(s);}), m_image_ctx->cct); ++ ++ int ret = m_cache_client->connect(); ++ if (ret < 0) { ++ ldout(cct, 5) << "SRO cache client fail to connect with local controller: " ++ << "please start rbd-cache daemon" ++ << dendl; ++ } else { ++ ldout(cct, 5) << "SRO cache client to register volume on rbd-cache daemon: " ++ << "name = " << m_image_ctx->id ++ << dendl; ++ ++ ret = m_cache_client->register_volume(m_image_ctx->data_ctx.get_pool_name(), ++ m_image_ctx->id, m_image_ctx->size); ++ ++ if (ret >= 0) { ++ // add ourself to the IO object dispatcher chain ++ m_image_ctx->io_object_dispatcher->register_object_dispatch(this); ++ } ++ } ++} ++ ++template <typename I> ++bool SharedReadOnlyObjectDispatch<I>::read( ++ const std::string &oid, uint64_t object_no, uint64_t object_off, ++ uint64_t object_len, librados::snap_t snap_id, int op_flags, ++ const ZTracer::Trace &parent_trace, ceph::bufferlist* read_data, ++ io::ExtentMap* extent_map, int* object_dispatch_flags, ++ io::DispatchResult* dispatch_result, Context** on_finish, ++ Context* on_dispatched) { ++ auto cct = m_image_ctx->cct; ++ ldout(cct, 20) << "object_no=" << object_no << " " << object_off << "~" ++ << object_len << dendl; ++ ++ // if any session fails, later reads will go to rados ++ if(!m_cache_client->is_session_work()) { ++ *dispatch_result = io::DISPATCH_RESULT_CONTINUE; ++ on_dispatched->complete(0); ++ return true; ++ // TODO(): fix domain socket error ++ } ++ ++ auto ctx = new FunctionContext([this, oid, object_off, object_len, ++ read_data, dispatch_result, on_dispatched](bool cache) { ++ handle_read_cache(cache, oid, object_off, object_len, ++ read_data, dispatch_result, on_dispatched); ++ }); ++ ++ if (m_cache_client && m_cache_client->is_session_work() && m_object_store) { ++ m_cache_client->lookup_object(m_image_ctx->data_ctx.get_pool_name(), ++ m_image_ctx->id, oid, ctx); ++ } ++ return true; ++} ++ ++template <typename I> ++int SharedReadOnlyObjectDispatch<I>::handle_read_cache( ++ bool cache, const std::string &oid, uint64_t object_off, ++ uint64_t object_len, ceph::bufferlist* read_data, ++ io::DispatchResult* dispatch_result, Context* on_dispatched) { ++ auto cct = m_image_ctx->cct; ++ ldout(cct, 20) << dendl; ++ ++ // try to read from parent image ++ if (cache) { ++ int r = m_object_store->read_object(oid, read_data, object_off, object_len, on_dispatched); ++ //int r = object_len; ++ if (r != 0) { ++ *dispatch_result = io::DISPATCH_RESULT_COMPLETE; ++ //TODO(): complete in syncfile ++ on_dispatched->complete(r); ++ ldout(cct, 20) << "read cache: " << *dispatch_result <<dendl; ++ return true; ++ } ++ } else { ++ *dispatch_result = io::DISPATCH_RESULT_CONTINUE; ++ on_dispatched->complete(0); ++ ldout(cct, 20) << "read rados: " << *dispatch_result <<dendl; ++ return false; ++ } ++} ++template <typename I> ++void SharedReadOnlyObjectDispatch<I>::client_handle_request(std::string msg) { ++ auto cct = m_image_ctx->cct; ++ ldout(cct, 20) << dendl; ++ ++ ceph::immutable_obj_cache::rbdsc_req_type_t *io_ctx = (ceph::immutable_obj_cache::rbdsc_req_type_t*)(msg.c_str()); ++ ++ switch (io_ctx->type) { ++ case ceph::immutable_obj_cache::RBDSC_REGISTER_REPLY: { ++ // open cache handler for volume ++ ldout(cct, 20) << "SRO cache client open cache handler" << dendl; ++ m_object_store = new SharedPersistentObjectCacher<I>(m_image_ctx, m_image_ctx->shared_cache_path); ++ ++ break; ++ } ++ case ceph::immutable_obj_cache::RBDSC_READ_REPLY: { ++ ldout(cct, 20) << "SRO cache client start to read cache" << dendl; ++ //TODO(): should call read here ++ ++ break; ++ } ++ case ceph::immutable_obj_cache::RBDSC_READ_RADOS: { ++ ldout(cct, 20) << "SRO cache client start to read rados" << dendl; ++ //TODO(): should call read here ++ ++ break; ++ } ++ default: ldout(cct, 20) << "nothing" << io_ctx->type <<dendl; ++ break; ++ ++ } ++} ++ ++} // namespace cache ++} // namespace librbd ++ ++template class librbd::cache::SharedReadOnlyObjectDispatch<librbd::ImageCtx>; +diff --git a/src/librbd/cache/SharedReadOnlyObjectDispatch.h b/src/librbd/cache/SharedReadOnlyObjectDispatch.h +new file mode 100644 +index 0000000..9b56da9 +--- /dev/null ++++ b/src/librbd/cache/SharedReadOnlyObjectDispatch.h +@@ -0,0 +1,126 @@ ++// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- ++// vim: ts=8 sw=2 smarttab ++ ++#ifndef CEPH_LIBRBD_CACHE_SHARED_PERSISTENT_OBJECT_CACHER_OBJECT_DISPATCH_H ++#define CEPH_LIBRBD_CACHE_SHARED_PERSISTENT_OBJECT_CACHER_OBJECT_DISPATCH_H ++ ++#include "common/Mutex.h" ++#include "SharedPersistentObjectCacher.h" ++#include "librbd/io/ObjectDispatchInterface.h" ++#include "tools/ceph_immutable_object_cache/CacheClient.h" ++ ++ ++namespace librbd { ++ ++class ImageCtx; ++ ++namespace cache { ++ ++template <typename ImageCtxT = ImageCtx> ++class SharedReadOnlyObjectDispatch : public io::ObjectDispatchInterface { ++public: ++ static SharedReadOnlyObjectDispatch* create(ImageCtxT* image_ctx) { ++ return new SharedReadOnlyObjectDispatch(image_ctx); ++ } ++ ++ SharedReadOnlyObjectDispatch(ImageCtxT* image_ctx); ++ ~SharedReadOnlyObjectDispatch() override; ++ ++ io::ObjectDispatchLayer get_object_dispatch_layer() const override { ++ return io::OBJECT_DISPATCH_LAYER_SHARED_PERSISTENT_CACHE; ++ } ++ ++ void init(); ++ void shut_down(Context* on_finish) { ++ m_image_ctx->op_work_queue->queue(on_finish, 0); ++ } ++ ++ bool read( ++ const std::string &oid, uint64_t object_no, uint64_t object_off, ++ uint64_t object_len, librados::snap_t snap_id, int op_flags, ++ const ZTracer::Trace &parent_trace, ceph::bufferlist* read_data, ++ io::ExtentMap* extent_map, int* object_dispatch_flags, ++ io::DispatchResult* dispatch_result, Context** on_finish, ++ Context* on_dispatched) override; ++ ++ bool discard( ++ const std::string &oid, uint64_t object_no, uint64_t object_off, ++ uint64_t object_len, const ::SnapContext &snapc, int discard_flags, ++ const ZTracer::Trace &parent_trace, int* object_dispatch_flags, ++ uint64_t* journal_tid, io::DispatchResult* dispatch_result, ++ Context** on_finish, Context* on_dispatched) { ++ return false; ++ } ++ ++ bool write( ++ const std::string &oid, uint64_t object_no, uint64_t object_off, ++ ceph::bufferlist&& data, const ::SnapContext &snapc, int op_flags, ++ const ZTracer::Trace &parent_trace, int* object_dispatch_flags, ++ uint64_t* journal_tid, io::DispatchResult* dispatch_result, ++ Context** on_finish, Context* on_dispatched) { ++ return false; ++ } ++ ++ bool write_same( ++ const std::string &oid, uint64_t object_no, uint64_t object_off, ++ uint64_t object_len, io::Extents&& buffer_extents, ++ ceph::bufferlist&& data, const ::SnapContext &snapc, int op_flags, ++ const ZTracer::Trace &parent_trace, int* object_dispatch_flags, ++ uint64_t* journal_tid, io::DispatchResult* dispatch_result, ++ Context** on_finish, Context* on_dispatched) { ++ return false; ++ } ++ ++ bool compare_and_write( ++ const std::string &oid, uint64_t object_no, uint64_t object_off, ++ ceph::bufferlist&& cmp_data, ceph::bufferlist&& write_data, ++ const ::SnapContext &snapc, int op_flags, ++ const ZTracer::Trace &parent_trace, uint64_t* mismatch_offset, ++ int* object_dispatch_flags, uint64_t* journal_tid, ++ io::DispatchResult* dispatch_result, Context** on_finish, ++ Context* on_dispatched) { ++ return false; ++ } ++ ++ bool flush( ++ io::FlushSource flush_source, const ZTracer::Trace &parent_trace, ++ io::DispatchResult* dispatch_result, Context** on_finish, ++ Context* on_dispatched) { ++ return false; ++ } ++ ++ bool invalidate_cache(Context* on_finish) { ++ return false; ++ } ++ ++ bool reset_existence_cache(Context* on_finish) { ++ return false; ++ } ++ ++ void extent_overwritten( ++ uint64_t object_no, uint64_t object_off, uint64_t object_len, ++ uint64_t journal_tid, uint64_t new_journal_tid) { ++ } ++ ++private: ++ ++ int handle_read_cache( ++ bool cache, ++ const std::string &oid, uint64_t object_off, ++ uint64_t object_len, ceph::bufferlist* read_data, ++ io::DispatchResult* dispatch_result, ++ Context* on_dispatched); ++ void client_handle_request(std::string msg); ++ ++ ImageCtxT* m_image_ctx; ++ ++ ceph::immutable_obj_cache::CacheClient *m_cache_client = nullptr; ++ SharedPersistentObjectCacher<ImageCtxT> *m_object_store = nullptr; ++}; ++ ++} // namespace cache ++} // namespace librbd ++ ++extern template class librbd::cache::SharedReadOnlyObjectDispatch<librbd::ImageCtx>; ++ ++#endif // CEPH_LIBRBD_CACHE_OBJECT_CACHER_OBJECT_DISPATCH_H +diff --git a/src/librbd/image/OpenRequest.cc b/src/librbd/image/OpenRequest.cc +index 30a7b66..57ce92f 100644 +--- a/src/librbd/image/OpenRequest.cc ++++ b/src/librbd/image/OpenRequest.cc +@@ -8,7 +8,7 @@ + #include "librbd/ImageCtx.h" + #include "librbd/Utils.h" + #include "librbd/cache/ObjectCacherObjectDispatch.h" +-#include "librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc" ++#include "librbd/cache/SharedReadOnlyObjectDispatch.cc" + #include "librbd/image/CloseRequest.h" + #include "librbd/image/RefreshRequest.h" + #include "librbd/image/SetSnapRequest.h" +@@ -457,7 +457,7 @@ Context *OpenRequest<I>::send_init_cache(int *result) { + // enable Shared Read-only cache for parent image + if (m_image_ctx->child != nullptr && m_image_ctx->shared_cache_enabled ) { + ldout(cct, 10) << this << " " << "setting up parent cache"<< dendl; +- auto sro_cache = cache::SharedPersistentObjectCacherObjectDispatch<I>::create(m_image_ctx); ++ auto sro_cache = cache::SharedReadOnlyObjectDispatch<I>::create(m_image_ctx); + sro_cache->init(); + } + +diff --git a/src/tools/CMakeLists.txt b/src/tools/CMakeLists.txt +index 72ab342..f7c5872 100644 +--- a/src/tools/CMakeLists.txt ++++ b/src/tools/CMakeLists.txt +@@ -99,7 +99,6 @@ endif(WITH_CEPHFS) + if(WITH_RBD) + add_subdirectory(rbd) + add_subdirectory(rbd_mirror) +- add_subdirectory(rbd_cache) + if(LINUX) + add_subdirectory(rbd_nbd) + endif() +@@ -108,4 +107,5 @@ if(WITH_RBD) + endif() + endif(WITH_RBD) + ++add_subdirectory(ceph_immutable_object_cache) + add_subdirectory(ceph-dencoder) +diff --git a/src/tools/ceph_immutable_object_cache/CMakeLists.txt b/src/tools/ceph_immutable_object_cache/CMakeLists.txt +new file mode 100644 +index 0000000..c7c7af3 +--- /dev/null ++++ b/src/tools/ceph_immutable_object_cache/CMakeLists.txt +@@ -0,0 +1,11 @@ ++add_executable(ceph-immutable-object-cache ++ ${CMAKE_SOURCE_DIR}/src/librbd/cache/SharedPersistentObjectCacherFile.cc ++ ObjectCacheStore.cc ++ CacheController.cc ++ CacheServer.cc ++ CacheSession.cc ++ main.cc) ++target_link_libraries(ceph-immutable-object-cache ++ librados ++ global) ++install(TARGETS ceph-immutable-object-cache DESTINATION bin) +diff --git a/src/tools/ceph_immutable_object_cache/CacheClient.cc b/src/tools/ceph_immutable_object_cache/CacheClient.cc +new file mode 100644 +index 0000000..a7116bf +--- /dev/null ++++ b/src/tools/ceph_immutable_object_cache/CacheClient.cc +@@ -0,0 +1,205 @@ ++// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- ++// vim: ts=8 sw=2 smarttab ++ ++#include "CacheClient.h" ++ ++#define dout_context g_ceph_context ++#define dout_subsys ceph_subsys_immutable_obj_cache ++#undef dout_prefix ++#define dout_prefix *_dout << "ceph::cache::CacheControllerSocketClient: " << this << " " \ ++ << __func__ << ": " ++ ++ ++using boost::asio::local::stream_protocol; ++ ++namespace ceph { ++namespace immutable_obj_cache { ++ ++ CacheClient::CacheClient(const std::string& file, ClientProcessMsg processmsg, CephContext* ceph_ctx) ++ : m_io_service_work(m_io_service), ++ m_dm_socket(m_io_service), ++ m_client_process_msg(processmsg), ++ m_ep(stream_protocol::endpoint(file)), ++ m_session_work(false), ++ cct(ceph_ctx) ++ { ++ // TODO wrapper io_service ++ std::thread thd([this](){m_io_service.run();}); ++ thd.detach(); ++ } ++ ++ void CacheClient::run(){ ++ } ++ ++ bool CacheClient::is_session_work() { ++ return m_session_work.load() == true; ++ } ++ ++ // just when error occur, call this method. ++ void CacheClient::close() { ++ m_session_work.store(false); ++ boost::system::error_code close_ec; ++ m_dm_socket.close(close_ec); ++ if(close_ec) { ++ ldout(cct, 20) << "close: " << close_ec.message() << dendl; ++ } ++ ldout(cct, 20) << "session don't work, later all request will be dispatched to rados layer" << dendl; ++ } ++ ++ int CacheClient::connect() { ++ boost::system::error_code ec; ++ m_dm_socket.connect(m_ep, ec); ++ if(ec) { ++ if(ec == boost::asio::error::connection_refused) { ++ ldout(cct, 20) << ec.message() << " : maybe rbd-cache Controller don't startup. " ++ << "Now data will be read from ceph cluster " << dendl; ++ } else { ++ ldout(cct, 20) << "connect: " << ec.message() << dendl; ++ } ++ ++ if(m_dm_socket.is_open()) { ++ // Set to indicate what error occurred, if any. ++ // Note that, even if the function indicates an error, ++ // the underlying descriptor is closed. ++ boost::system::error_code close_ec; ++ m_dm_socket.close(close_ec); ++ if(close_ec) { ++ ldout(cct, 20) << "close: " << close_ec.message() << dendl; ++ } ++ } ++ return -1; ++ } ++ ++ ldout(cct, 20) <<"connect success"<< dendl; ++ ++ return 0; ++ } ++ ++ int CacheClient::register_volume(std::string pool_name, std::string vol_name, uint64_t vol_size) { ++ // cache controller will init layout ++ rbdsc_req_type_t *message = new rbdsc_req_type_t(); ++ message->type = RBDSC_REGISTER; ++ memcpy(message->pool_name, pool_name.c_str(), pool_name.size()); ++ memcpy(message->vol_name, vol_name.c_str(), vol_name.size()); ++ message->vol_size = vol_size; ++ message->offset = 0; ++ message->length = 0; ++ ++ uint64_t ret; ++ boost::system::error_code ec; ++ ++ ret = boost::asio::write(m_dm_socket, boost::asio::buffer((char*)message, message->size()), ec); ++ if(ec) { ++ ldout(cct, 20) << "write fails : " << ec.message() << dendl; ++ return -1; ++ } ++ ++ if(ret != message->size()) { ++ ldout(cct, 20) << "write fails : ret != send_bytes " << dendl; ++ return -1; ++ } ++ ++ // hard code TODO ++ ret = boost::asio::read(m_dm_socket, boost::asio::buffer(m_recv_buffer, RBDSC_MSG_LEN), ec); ++ if(ec == boost::asio::error::eof) { ++ ldout(cct, 20) << "recv eof" << dendl; ++ return -1; ++ } ++ ++ if(ec) { ++ ldout(cct, 20) << "write fails : " << ec.message() << dendl; ++ return -1; ++ } ++ ++ if(ret != RBDSC_MSG_LEN) { ++ ldout(cct, 20) << "write fails : ret != receive bytes " << dendl; ++ return -1; ++ } ++ ++ m_client_process_msg(std::string(m_recv_buffer, ret)); ++ ++ delete message; ++ ++ ldout(cct, 20) << "register volume success" << dendl; ++ ++ // TODO ++ m_session_work.store(true); ++ ++ return 0; ++ } ++ ++ // if occur any error, we just return false. Then read from rados. ++ int CacheClient::lookup_object(std::string pool_name, std::string vol_name, std::string object_id, Context* on_finish) { ++ rbdsc_req_type_t *message = new rbdsc_req_type_t(); ++ message->type = RBDSC_READ; ++ memcpy(message->pool_name, pool_name.c_str(), pool_name.size()); ++ memcpy(message->vol_name, object_id.c_str(), object_id.size()); ++ message->vol_size = 0; ++ message->offset = 0; ++ message->length = 0; ++ ++ boost::asio::async_write(m_dm_socket, ++ boost::asio::buffer((char*)message, message->size()), ++ boost::asio::transfer_exactly(RBDSC_MSG_LEN), ++ [this, on_finish, message](const boost::system::error_code& err, size_t cb) { ++ delete message; ++ if(err) { ++ ldout(cct, 20) << "lookup_object: async_write fails." << err.message() << dendl; ++ close(); ++ on_finish->complete(false); ++ return; ++ } ++ if(cb != RBDSC_MSG_LEN) { ++ ldout(cct, 20) << "lookup_object: async_write fails. in-complete request" << dendl; ++ close(); ++ on_finish->complete(false); ++ return; ++ } ++ get_result(on_finish); ++ }); ++ ++ return 0; ++ } ++ ++ void CacheClient::get_result(Context* on_finish) { ++ boost::asio::async_read(m_dm_socket, boost::asio::buffer(m_recv_buffer, RBDSC_MSG_LEN), ++ boost::asio::transfer_exactly(RBDSC_MSG_LEN), ++ [this, on_finish](const boost::system::error_code& err, size_t cb) { ++ if(err == boost::asio::error::eof) { ++ ldout(cct, 20) << "get_result: ack is EOF." << dendl; ++ close(); ++ on_finish->complete(false); ++ return; ++ } ++ if(err) { ++ ldout(cct, 20) << "get_result: async_read fails:" << err.message() << dendl; ++ close(); ++ on_finish->complete(false); // TODO replace this assert with some metohds. ++ return; ++ } ++ if (cb != RBDSC_MSG_LEN) { ++ close(); ++ ldout(cct, 20) << "get_result: in-complete ack." << dendl; ++ on_finish->complete(false); // TODO: replace this assert with some methods. ++ } ++ ++ rbdsc_req_type_t *io_ctx = (rbdsc_req_type_t*)(m_recv_buffer); ++ ++ // TODO: re-occur yuan's bug ++ if(io_ctx->type == RBDSC_READ) { ++ ldout(cct, 20) << "get rbdsc_read... " << dendl; ++ assert(0); ++ } ++ ++ if (io_ctx->type == RBDSC_READ_REPLY) { ++ on_finish->complete(true); ++ return; ++ } else { ++ on_finish->complete(false); ++ return; ++ } ++ }); ++ } ++ ++} // namespace immutable_obj_cache ++} // namespace ceph +diff --git a/src/tools/ceph_immutable_object_cache/CacheClient.h b/src/tools/ceph_immutable_object_cache/CacheClient.h +new file mode 100644 +index 0000000..d82ab8f +--- /dev/null ++++ b/src/tools/ceph_immutable_object_cache/CacheClient.h +@@ -0,0 +1,53 @@ ++// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- ++// vim: ts=8 sw=2 smarttab ++ ++#ifndef CEPH_CACHE_CLIENT_H ++#define CEPH_CACHE_CLIENT_H ++ ++#include <atomic> ++#include <boost/asio.hpp> ++#include <boost/bind.hpp> ++#include <boost/asio/error.hpp> ++#include <boost/algorithm/string.hpp> ++#include "librbd/ImageCtx.h" ++#include "include/assert.h" ++#include "include/Context.h" ++#include "SocketCommon.h" ++ ++ ++using boost::asio::local::stream_protocol; ++ ++namespace ceph { ++namespace immutable_obj_cache { ++ ++class CacheClient { ++public: ++ CacheClient(const std::string& file, ClientProcessMsg processmsg, CephContext* ceph_ctx); ++ void run(); ++ bool is_session_work(); ++ ++ void close(); ++ int connect(); ++ ++ int register_volume(std::string pool_name, std::string vol_name, uint64_t vol_size); ++ int lookup_object(std::string pool_name, std::string vol_name, std::string object_id, Context* on_finish); ++ void get_result(Context* on_finish); ++ ++private: ++ boost::asio::io_service m_io_service; ++ boost::asio::io_service::work m_io_service_work; ++ stream_protocol::socket m_dm_socket; ++ ClientProcessMsg m_client_process_msg; ++ stream_protocol::endpoint m_ep; ++ char m_recv_buffer[1024]; ++ ++ // atomic modfiy for this variable. ++ // thread 1 : asio callback thread modify it. ++ // thread 2 : librbd read it. ++ std::atomic<bool> m_session_work; ++ CephContext* cct; ++}; ++ ++} // namespace immutable_obj_cache ++} // namespace ceph ++#endif +diff --git a/src/tools/ceph_immutable_object_cache/CacheController.cc b/src/tools/ceph_immutable_object_cache/CacheController.cc +new file mode 100644 +index 0000000..cb636d2 +--- /dev/null ++++ b/src/tools/ceph_immutable_object_cache/CacheController.cc +@@ -0,0 +1,117 @@ ++// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- ++// vim: ts=8 sw=2 smarttab ++ ++#include "CacheController.h" ++ ++#define dout_context g_ceph_context ++#define dout_subsys ceph_subsys_immutable_obj_cache ++#undef dout_prefix ++#define dout_prefix *_dout << "ceph::cache::CacheController: " << this << " " \ ++ << __func__ << ": " ++ ++namespace ceph { ++namespace immutable_obj_cache { ++ ++class ThreadPoolSingleton : public ThreadPool { ++public: ++ ContextWQ *op_work_queue; ++ ++ explicit ThreadPoolSingleton(CephContext *cct) ++ : ThreadPool(cct, "ceph::cache::thread_pool", "tp_librbd_cache", 32, ++ "pcache_threads"), ++ op_work_queue(new ContextWQ("ceph::pcache_op_work_queue", ++ cct->_conf.get_val<int64_t>("rbd_op_thread_timeout"), ++ this)) { ++ start(); ++ } ++ ~ThreadPoolSingleton() override { ++ op_work_queue->drain(); ++ delete op_work_queue; ++ ++ stop(); ++ } ++}; ++ ++ ++CacheController::CacheController(CephContext *cct, const std::vector<const char*> &args): ++ m_args(args), m_cct(cct) { ++ ++} ++ ++CacheController::~CacheController() { ++ ++} ++ ++int CacheController::init() { ++ ThreadPoolSingleton* thread_pool_singleton = &m_cct->lookup_or_create_singleton_object<ThreadPoolSingleton>( ++ "ceph::cache::thread_pool", false, m_cct); ++ pcache_op_work_queue = thread_pool_singleton->op_work_queue; ++ ++ m_object_cache_store = new ObjectCacheStore(m_cct, pcache_op_work_queue); ++ //TODO(): make this configurable ++ int r = m_object_cache_store->init(true); ++ if (r < 0) { ++ lderr(m_cct) << "init error\n" << dendl; ++ } ++ return r; ++} ++ ++int CacheController::shutdown() { ++ int r = m_object_cache_store->shutdown(); ++ return r; ++} ++ ++void CacheController::handle_signal(int signum){} ++ ++void CacheController::run() { ++ try { ++ //TODO(): use new socket path ++ std::string controller_path = m_cct->_conf.get_val<std::string>("rbd_shared_cache_sock"); ++ std::remove(controller_path.c_str()); ++ ++ m_cache_server = new CacheServer(controller_path, ++ ([&](uint64_t p, std::string s){handle_request(p, s);}), m_cct); ++ m_cache_server->run(); ++ } catch (std::exception& e) { ++ lderr(m_cct) << "Exception: " << e.what() << dendl; ++ } ++} ++ ++void CacheController::handle_request(uint64_t session_id, std::string msg){ ++ rbdsc_req_type_t *io_ctx = (rbdsc_req_type_t*)(msg.c_str()); ++ ++ int ret = 0; ++ ++ switch (io_ctx->type) { ++ case RBDSC_REGISTER: { ++ // init cache layout for volume ++ m_object_cache_store->init_cache(io_ctx->vol_name, io_ctx->vol_size); ++ io_ctx->type = RBDSC_REGISTER_REPLY; ++ m_cache_server->send(session_id, std::string((char*)io_ctx, msg.size())); ++ ++ break; ++ } ++ case RBDSC_READ: { ++ // lookup object in local cache store ++ ret = m_object_cache_store->lookup_object(io_ctx->pool_name, io_ctx->vol_name); ++ if (ret < 0) { ++ io_ctx->type = RBDSC_READ_RADOS; ++ } else { ++ io_ctx->type = RBDSC_READ_REPLY; ++ } ++ if (io_ctx->type != RBDSC_READ_REPLY) { ++ assert(0); ++ } ++ m_cache_server->send(session_id, std::string((char*)io_ctx, msg.size())); ++ ++ break; ++ } ++ ldout(m_cct, 5) << "can't recongize request" << dendl; ++ assert(0); // TODO replace it. ++ } ++} ++ ++} // namespace immutable_obj_cache ++} // namespace ceph ++ ++ +diff --git a/src/tools/ceph_immutable_object_cache/CacheController.h b/src/tools/ceph_immutable_object_cache/CacheController.h +new file mode 100644 +index 0000000..837fe36 +--- /dev/null ++++ b/src/tools/ceph_immutable_object_cache/CacheController.h +@@ -0,0 +1,53 @@ ++// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- ++// vim: ts=8 sw=2 smarttab ++ ++#ifndef CEPH_CACHE_CONTROLLER_H ++#define CEPH_CACHE_CONTROLLER_H ++ ++#include "common/Formatter.h" ++#include "common/admin_socket.h" ++#include "common/debug.h" ++#include "common/errno.h" ++#include "common/ceph_context.h" ++#include "common/Mutex.h" ++#include "common/WorkQueue.h" ++#include "include/rados/librados.hpp" ++#include "include/rbd/librbd.h" ++#include "include/assert.h" ++#include "librbd/ImageCtx.h" ++#include "librbd/ImageState.h" ++#include "CacheServer.h" ++#include "ObjectCacheStore.h" ++ ++#include <thread> ++ ++namespace ceph { ++namespace immutable_obj_cache { ++ ++class CacheController { ++ public: ++ CacheController(CephContext *cct, const std::vector<const char*> &args); ++ ~CacheController(); ++ ++ int init(); ++ ++ int shutdown(); ++ ++ void handle_signal(int sinnum); ++ ++ void run(); ++ ++ void handle_request(uint64_t sesstion_id, std::string msg); ++ ++ private: ++ CacheServer *m_cache_server; ++ std::vector<const char*> m_args; ++ CephContext *m_cct; ++ ObjectCacheStore *m_object_cache_store; ++ ContextWQ* pcache_op_work_queue; ++}; ++ ++} // namespace immutable_obj_cache ++} // namespace ceph ++ ++#endif +diff --git a/src/tools/ceph_immutable_object_cache/CacheServer.cc b/src/tools/ceph_immutable_object_cache/CacheServer.cc +new file mode 100644 +index 0000000..dd2d47e +--- /dev/null ++++ b/src/tools/ceph_immutable_object_cache/CacheServer.cc +@@ -0,0 +1,99 @@ ++// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- ++// vim: ts=8 sw=2 smarttab ++ ++#include "common/debug.h" ++#include "common/ceph_context.h" ++#include "CacheServer.h" ++ ++#define dout_context g_ceph_context ++#define dout_subsys ceph_subsys_immutable_obj_cache ++#undef dout_prefix ++#define dout_prefix *_dout << "ceph::cache::CacheControllerSocket: " << this << " " \ ++ << __func__ << ": " ++ ++ ++using boost::asio::local::stream_protocol; ++ ++namespace ceph { ++namespace immutable_obj_cache { ++ ++CacheServer::CacheServer(const std::string& file, ProcessMsg processmsg, CephContext* cct) ++ : cct(cct), m_server_process_msg(processmsg), ++ m_local_path(file), m_acceptor(m_io_service) {} ++ ++CacheServer::~CacheServer(){} ++ ++void CacheServer::run() { ++ bool ret; ++ ret = start_accept(); ++ if(!ret) { ++ return; ++ } ++ m_io_service.run(); ++} ++ ++// TODO : use callback to replace this function. ++void CacheServer::send(uint64_t session_id, std::string msg) { ++ auto it = m_session_map.find(session_id); ++ if (it != m_session_map.end()) { ++ it->second->send(msg); ++ } else { ++ // TODO : why don't find existing session id ? ++ ldout(cct, 20) << "don't find session id..." << dendl; ++ assert(0); ++ } ++} ++ ++// when creating one acceptor, can control every step in this way. ++bool CacheServer::start_accept() { ++ boost::system::error_code ec; ++ m_acceptor.open(m_local_path.protocol(), ec); ++ if(ec) { ++ ldout(cct, 20) << "m_acceptor open fails: " << ec.message() << dendl; ++ return false; ++ } ++ ++ // TODO control acceptor attribute. ++ ++ m_acceptor.bind(m_local_path, ec); ++ if(ec) { ++ ldout(cct, 20) << "m_acceptor bind fails: " << ec.message() << dendl; ++ return false; ++ } ++ ++ m_acceptor.listen(boost::asio::socket_base::max_connections, ec); ++ if(ec) { ++ ldout(cct, 20) << "m_acceptor listen fails: " << ec.message() << dendl; ++ return false; ++ } ++ ++ accept(); ++ return true; ++} ++ ++void CacheServer::accept() { ++ CacheSessionPtr new_session(new CacheSession(m_session_id, m_io_service, m_server_process_msg, cct)); ++ m_acceptor.async_accept(new_session->socket(), ++ boost::bind(&CacheServer::handle_accept, this, new_session, ++ boost::asio::placeholders::error)); ++} ++ ++void CacheServer::handle_accept(CacheSessionPtr new_session, const boost::system::error_code& error) { ++ ++ if(error) { ++ lderr(cct) << "async accept fails : " << error.message() << dendl; ++ assert(0); // TODO ++ } ++ ++ m_session_map.emplace(m_session_id, new_session); ++ // TODO : session setting ++ new_session->start(); ++ m_session_id++; ++ ++ // lanuch next accept ++ accept(); ++} ++ ++} // namespace immutable_obj_cache ++} // namespace ceph ++ +diff --git a/src/tools/ceph_immutable_object_cache/CacheServer.h b/src/tools/ceph_immutable_object_cache/CacheServer.h +new file mode 100644 +index 0000000..6c5c133 +--- /dev/null ++++ b/src/tools/ceph_immutable_object_cache/CacheServer.h +@@ -0,0 +1,54 @@ ++// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- ++// vim: ts=8 sw=2 smarttab ++ ++#ifndef CEPH_CACHE_SERVER_H ++#define CEPH_CACHE_SERVER_H ++ ++#include <cstdio> ++#include <iostream> ++#include <array> ++#include <memory> ++#include <string> ++#include <boost/bind.hpp> ++#include <boost/asio.hpp> ++#include <boost/asio/error.hpp> ++#include <boost/algorithm/string.hpp> ++ ++#include "include/assert.h" ++#include "SocketCommon.h" ++#include "CacheSession.h" ++ ++ ++using boost::asio::local::stream_protocol; ++ ++namespace ceph { ++namespace immutable_obj_cache { ++ ++class CacheServer { ++ ++ public: ++ CacheServer(const std::string& file, ProcessMsg processmsg, CephContext* cct); ++ ~CacheServer(); ++ ++ void run(); ++ void send(uint64_t session_id, std::string msg); ++ ++ private: ++ bool start_accept(); ++ void accept(); ++ void handle_accept(CacheSessionPtr new_session, const boost::system::error_code& error); ++ ++ private: ++ CephContext* cct; ++ boost::asio::io_service m_io_service; // TODO wrapper it. ++ ProcessMsg m_server_process_msg; ++ stream_protocol::endpoint m_local_path; ++ stream_protocol::acceptor m_acceptor; ++ uint64_t m_session_id = 1; ++ std::map<uint64_t, CacheSessionPtr> m_session_map; ++}; ++ ++} // namespace immutable_obj_cache ++} // namespace ceph ++ ++#endif +diff --git a/src/tools/ceph_immutable_object_cache/CacheSession.cc b/src/tools/ceph_immutable_object_cache/CacheSession.cc +new file mode 100644 +index 0000000..6cffb41 +--- /dev/null ++++ b/src/tools/ceph_immutable_object_cache/CacheSession.cc +@@ -0,0 +1,115 @@ ++// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- ++// vim: ts=8 sw=2 smarttab ++ ++#include "common/debug.h" ++#include "common/ceph_context.h" ++#include "CacheSession.h" ++ ++#define dout_context g_ceph_context ++#define dout_subsys ceph_subsys_immutable_obj_cache ++#undef dout_prefix ++#define dout_prefix *_dout << "ceph::cache::CacheSession: " << this << " " \ ++ << __func__ << ": " ++ ++ ++namespace ceph { ++namespace immutable_obj_cache { ++ ++CacheSession::CacheSession(uint64_t session_id, boost::asio::io_service& io_service, ProcessMsg processmsg, CephContext* cct) ++ : m_session_id(session_id), m_dm_socket(io_service), process_msg(processmsg), cct(cct) ++ {} ++ ++CacheSession::~CacheSession(){} ++ ++stream_protocol::socket& CacheSession::socket() { ++ return m_dm_socket; ++} ++ ++void CacheSession::start() { ++ if(true) { ++ serial_handing_request(); ++ } else { ++ parallel_handing_request(); ++ } ++} ++// flow: ++// ++// recv request --> process request --> reply ack ++// | | ++// --------------<------------------------- ++void CacheSession::serial_handing_request() { ++ boost::asio::async_read(m_dm_socket, boost::asio::buffer(m_buffer, RBDSC_MSG_LEN), ++ boost::asio::transfer_exactly(RBDSC_MSG_LEN), ++ boost::bind(&CacheSession::handle_read, ++ shared_from_this(), ++ boost::asio::placeholders::error, ++ boost::asio::placeholders::bytes_transferred)); ++} ++ ++// flow : ++// ++// --> thread 1: process request ++// recv request --> thread 2: process request --> reply ack ++// --> thread n: process request ++// ++void CacheSession::parallel_handing_request() { ++ // TODO ++} ++ ++void CacheSession::handle_read(const boost::system::error_code& error, size_t bytes_transferred) { ++ // when recv eof, the most proble is that client side close socket. ++ // so, server side need to end handing_request ++ if(error == boost::asio::error::eof) { ++ ldout(cct, 20) << "session: async_read : " << error.message() << dendl; ++ return; ++ } ++ ++ if(error) { ++ ldout(cct, 20) << "session: async_read fails: " << error.message() << dendl; ++ assert(0); ++ } ++ ++ if(bytes_transferred != RBDSC_MSG_LEN) { ++ ldout(cct, 20) << "session : request in-complete. "<<dendl; ++ assert(0); ++ } ++ ++ // TODO async_process can increse coding readable. ++ // process_msg_callback call handle async_send ++ process_msg(m_session_id, std::string(m_buffer, bytes_transferred)); ++} ++ ++void CacheSession::handle_write(const boost::system::error_code& error, size_t bytes_transferred) { ++ if (error) { ++ ldout(cct, 20) << "session: async_write fails: " << error.message() << dendl; ++ assert(0); ++ } ++ ++ if(bytes_transferred != RBDSC_MSG_LEN) { ++ ldout(cct, 20) << "session : reply in-complete. "<<dendl; ++ assert(0); ++ } ++ ++ boost::asio::async_read(m_dm_socket, boost::asio::buffer(m_buffer), ++ boost::asio::transfer_exactly(RBDSC_MSG_LEN), ++ boost::bind(&CacheSession::handle_read, ++ shared_from_this(), ++ boost::asio::placeholders::error, ++ boost::asio::placeholders::bytes_transferred)); ++ ++} ++ ++void CacheSession::send(std::string msg) { ++ boost::asio::async_write(m_dm_socket, ++ boost::asio::buffer(msg.c_str(), msg.size()), ++ boost::asio::transfer_exactly(RBDSC_MSG_LEN), ++ boost::bind(&CacheSession::handle_write, ++ shared_from_this(), ++ boost::asio::placeholders::error, ++ boost::asio::placeholders::bytes_transferred)); ++ ++} ++ ++} // namespace immutable_obj_cache ++} // namespace ceph ++ +diff --git a/src/tools/ceph_immutable_object_cache/CacheSession.h b/src/tools/ceph_immutable_object_cache/CacheSession.h +new file mode 100644 +index 0000000..ce2591b +--- /dev/null ++++ b/src/tools/ceph_immutable_object_cache/CacheSession.h +@@ -0,0 +1,58 @@ ++// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- ++// vim: ts=8 sw=2 smarttab ++ ++#ifndef CEPH_CACHE_SESSION_H ++#define CEPH_CACHE_SESSION_H ++ ++#include <iostream> ++#include <string> ++#include <boost/bind.hpp> ++#include <boost/asio.hpp> ++#include <boost/asio/error.hpp> ++#include <boost/algorithm/string.hpp> ++ ++#include "include/assert.h" ++#include "SocketCommon.h" ++ ++ ++using boost::asio::local::stream_protocol; ++ ++namespace ceph { ++namespace immutable_obj_cache { ++ ++class CacheSession : public std::enable_shared_from_this<CacheSession> { ++public: ++ CacheSession(uint64_t session_id, boost::asio::io_service& io_service, ProcessMsg processmsg, CephContext* cct); ++ ~CacheSession(); ++ ++ stream_protocol::socket& socket(); ++ void start(); ++ void serial_handing_request(); ++ void parallel_handing_request(); ++ ++private: ++ ++ void handle_read(const boost::system::error_code& error, size_t bytes_transferred); ++ ++ void handle_write(const boost::system::error_code& error, size_t bytes_transferred); ++ ++public: ++ void send(std::string msg); ++ ++private: ++ uint64_t m_session_id; ++ stream_protocol::socket m_dm_socket; ++ ProcessMsg process_msg; ++ CephContext* cct; ++ ++ // Buffer used to store data received from the client. ++ //std::array<char, 1024> data_; ++ char m_buffer[1024]; ++}; ++ ++typedef std::shared_ptr<CacheSession> CacheSessionPtr; ++ ++} // namespace immutable_obj_cache ++} // namespace ceph ++ ++#endif +diff --git a/src/tools/ceph_immutable_object_cache/ObjectCacheStore.cc b/src/tools/ceph_immutable_object_cache/ObjectCacheStore.cc +new file mode 100644 +index 0000000..50721ca +--- /dev/null ++++ b/src/tools/ceph_immutable_object_cache/ObjectCacheStore.cc +@@ -0,0 +1,172 @@ ++// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- ++// vim: ts=8 sw=2 smarttab ++ ++#include "ObjectCacheStore.h" ++ ++#define dout_context g_ceph_context ++#define dout_subsys ceph_subsys_immutable_obj_cache ++#undef dout_prefix ++#define dout_prefix *_dout << "ceph::cache::ObjectCacheStore: " << this << " " \ ++ << __func__ << ": " ++ ++namespace ceph { ++namespace immutable_obj_cache { ++ ++ObjectCacheStore::ObjectCacheStore(CephContext *cct, ContextWQ* work_queue) ++ : m_cct(cct), m_work_queue(work_queue), ++ m_rados(new librados::Rados()) { ++ ++ uint64_t object_cache_entries = ++ cct->_conf.get_val<int64_t>("rbd_shared_cache_entries"); ++ ++ //TODO(): allow to set level ++ m_policy = new SimplePolicy(object_cache_entries, 0.5); ++} ++ ++ObjectCacheStore::~ObjectCacheStore() { ++ delete m_policy; ++} ++ ++int ObjectCacheStore::init(bool reset) { ++ ++ int ret = m_rados->init_with_context(m_cct); ++ if(ret < 0) { ++ lderr(m_cct) << "fail to init Ceph context" << dendl; ++ return ret; ++ } ++ ++ ret = m_rados->connect(); ++ if(ret < 0 ) { ++ lderr(m_cct) << "fail to conect to cluster" << dendl; ++ return ret; ++ } ++ ++ std::string cache_path = m_cct->_conf.get_val<std::string>("rbd_shared_cache_path"); ++ //TODO(): check and reuse existing cache objects ++ if(reset) { ++ std::string cmd = "exec rm -rf " + cache_path + "/rbd_cache*; exec mkdir -p " + cache_path; ++ //TODO(): to use std::filesystem ++ int r = system(cmd.c_str()); ++ } ++ ++ evict_thd = new std::thread([this]{this->evict_thread_body();}); ++ return ret; ++} ++ ++int ObjectCacheStore::do_promote(std::string pool_name, std::string object_name) { ++ int ret = 0; ++ std::string cache_file_name = pool_name + object_name; ++ ++ //TODO(): lock on ioctx map ++ if (m_ioctxs.find(pool_name) == m_ioctxs.end()) { ++ librados::IoCtx* io_ctx = new librados::IoCtx(); ++ ret = m_rados->ioctx_create(pool_name.c_str(), *io_ctx); ++ if (ret < 0) { ++ lderr(m_cct) << "fail to create ioctx" << dendl; ++ assert(0); ++ } ++ m_ioctxs.emplace(pool_name, io_ctx); ++ } ++ ++ assert(m_ioctxs.find(pool_name) != m_ioctxs.end()); ++ ++ librados::IoCtx* ioctx = m_ioctxs[pool_name]; ++ ++ librados::bufferlist* read_buf = new librados::bufferlist(); ++ int object_size = 4096*1024; //TODO(): read config from image metadata ++ ++ //TODO(): async promote ++ ret = promote_object(ioctx, object_name, read_buf, object_size); ++ if (ret == -ENOENT) { ++ read_buf->append(std::string(object_size, '0')); ++ ret = 0; ++ } ++ ++ if( ret < 0) { ++ lderr(m_cct) << "fail to read from rados" << dendl; ++ return ret; ++ } ++ ++ // persistent to cache ++ librbd::cache::SyncFile cache_file(m_cct, cache_file_name); ++ cache_file.open(); ++ ret = cache_file.write_object_to_file(*read_buf, object_size); ++ ++ // update metadata ++ assert(OBJ_CACHE_PROMOTING == m_policy->get_status(cache_file_name)); ++ m_policy->update_status(cache_file_name, OBJ_CACHE_PROMOTED); ++ assert(OBJ_CACHE_PROMOTED == m_policy->get_status(cache_file_name)); ++ ++ return ret; ++ ++} ++ ++// return -1, client need to read data from cluster. ++// return 0, client directly read data from cache. ++int ObjectCacheStore::lookup_object(std::string pool_name, std::string object_name) { ++ ++ std::string cache_file_name = pool_name + object_name; ++ ++ CACHESTATUS ret; ++ ret = m_policy->lookup_object(cache_file_name); ++ ++ switch(ret) { ++ case OBJ_CACHE_NONE: ++ return do_promote(pool_name, object_name); ++ case OBJ_CACHE_PROMOTED: ++ return 0; ++ case OBJ_CACHE_PROMOTING: ++ default: ++ return -1; ++ } ++} ++ ++void ObjectCacheStore::evict_thread_body() { ++ int ret; ++ while(m_evict_go) { ++ ret = evict_objects(); ++ } ++} ++ ++ ++int ObjectCacheStore::shutdown() { ++ m_evict_go = false; ++ evict_thd->join(); ++ m_rados->shutdown(); ++ return 0; ++} ++ ++int ObjectCacheStore::init_cache(std::string vol_name, uint64_t vol_size) { ++ return 0; ++} ++ ++int ObjectCacheStore::lock_cache(std::string vol_name) { ++ return 0; ++} ++ ++int ObjectCacheStore::promote_object(librados::IoCtx* ioctx, std::string object_name, librados::bufferlist* read_buf, uint64_t read_len) { ++ int ret; ++ ++ librados::AioCompletion* read_completion = librados::Rados::aio_create_completion(); ++ ++ ret = ioctx->aio_read(object_name, read_completion, read_buf, read_len, 0); ++ if(ret < 0) { ++ lderr(m_cct) << "fail to read from rados" << dendl; ++ return ret; ++ } ++ read_completion->wait_for_complete(); ++ ret = read_completion->get_return_value(); ++ return ret; ++ ++} ++ ++int ObjectCacheStore::evict_objects() { ++ std::list<std::string> obj_list; ++ m_policy->get_evict_list(&obj_list); ++ for (auto& obj: obj_list) { ++ //do_evict(obj); ++ } ++} ++ ++} // namespace immutable_obj_cache ++} // namespace ceph +diff --git a/src/tools/ceph_immutable_object_cache/ObjectCacheStore.h b/src/tools/ceph_immutable_object_cache/ObjectCacheStore.h +new file mode 100644 +index 0000000..d044b27 +--- /dev/null ++++ b/src/tools/ceph_immutable_object_cache/ObjectCacheStore.h +@@ -0,0 +1,70 @@ ++// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- ++// vim: ts=8 sw=2 smarttab ++ ++#ifndef CEPH_CACHE_OBJECT_CACHE_STORE_H ++#define CEPH_CACHE_OBJECT_CACHE_STORE_H ++ ++#include "common/debug.h" ++#include "common/errno.h" ++#include "common/ceph_context.h" ++#include "common/Mutex.h" ++#include "include/rados/librados.hpp" ++#include "include/rbd/librbd.h" ++#include "librbd/ImageCtx.h" ++#include "librbd/ImageState.h" ++#include "librbd/cache/SharedPersistentObjectCacherFile.h" ++#include "SimplePolicy.hpp" ++ ++ ++using librados::Rados; ++using librados::IoCtx; ++ ++namespace ceph { ++namespace immutable_obj_cache { ++ ++typedef shared_ptr<librados::Rados> RadosRef; ++typedef shared_ptr<librados::IoCtx> IoCtxRef; ++ ++class ObjectCacheStore ++{ ++ public: ++ ObjectCacheStore(CephContext *cct, ContextWQ* work_queue); ++ ~ObjectCacheStore(); ++ ++ int init(bool reset); ++ ++ int shutdown(); ++ ++ int lookup_object(std::string pool_name, std::string object_name); ++ ++ int init_cache(std::string vol_name, uint64_t vol_size); ++ ++ int lock_cache(std::string vol_name); ++ ++ private: ++ void evict_thread_body(); ++ int evict_objects(); ++ ++ int do_promote(std::string pool_name, std::string object_name); ++ ++ int promote_object(librados::IoCtx*, std::string object_name, ++ librados::bufferlist* read_buf, ++ uint64_t length); ++ ++ CephContext *m_cct; ++ ContextWQ* m_work_queue; ++ RadosRef m_rados; ++ ++ ++ std::map<std::string, librados::IoCtx*> m_ioctxs; ++ ++ librbd::cache::SyncFile *m_cache_file; ++ ++ Policy* m_policy; ++ std::thread* evict_thd; ++ bool m_evict_go = false; ++}; ++ ++} // namespace ceph ++} // namespace immutable_obj_cache ++#endif +diff --git a/src/tools/ceph_immutable_object_cache/Policy.hpp b/src/tools/ceph_immutable_object_cache/Policy.hpp +new file mode 100644 +index 0000000..8090202 +--- /dev/null ++++ b/src/tools/ceph_immutable_object_cache/Policy.hpp +@@ -0,0 +1,33 @@ ++// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- ++// vim: ts=8 sw=2 smarttab ++ ++#ifndef CEPH_CACHE_POLICY_HPP ++#define CEPH_CACHE_POLICY_HPP ++ ++#include <list> ++#include <string> ++ ++namespace ceph { ++namespace immutable_obj_cache { ++ ++enum CACHESTATUS { ++ OBJ_CACHE_NONE = 0, ++ OBJ_CACHE_PROMOTING, ++ OBJ_CACHE_PROMOTED, ++}; ++ ++ ++class Policy { ++public: ++ Policy(){} ++ virtual ~Policy(){}; ++ virtual CACHESTATUS lookup_object(std::string) = 0; ++ virtual int evict_object(std::string&) = 0; ++ virtual void update_status(std::string, CACHESTATUS) = 0; ++ virtual CACHESTATUS get_status(std::string) = 0; ++ virtual void get_evict_list(std::list<std::string>* obj_list) = 0; ++}; ++ ++} // namespace immutable_obj_cache ++} // namespace ceph ++#endif +diff --git a/src/tools/ceph_immutable_object_cache/SimplePolicy.hpp b/src/tools/ceph_immutable_object_cache/SimplePolicy.hpp +new file mode 100644 +index 0000000..757ee6a +--- /dev/null ++++ b/src/tools/ceph_immutable_object_cache/SimplePolicy.hpp +@@ -0,0 +1,163 @@ ++// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- ++// vim: ts=8 sw=2 smarttab ++ ++#ifndef CEPH_CACHE_SIMPLE_POLICY_HPP ++#define CEPH_CACHE_SIMPLE_POLICY_HPP ++ ++#include "Policy.hpp" ++#include "include/lru.h" ++#include "common/RWLock.h" ++#include "common/Mutex.h" ++ ++#include <vector> ++#include <unordered_map> ++#include <string> ++ ++namespace ceph { ++namespace immutable_obj_cache { ++ ++ ++class SimplePolicy : public Policy { ++public: ++ SimplePolicy(uint64_t block_num, float watermark) ++ : m_watermark(watermark), m_entry_count(block_num), ++ m_cache_map_lock("rbd::cache::SimplePolicy::m_cache_map_lock"), ++ m_free_list_lock("rbd::cache::SimplePolicy::m_free_list_lock") ++ { ++ ++ for(uint64_t i = 0; i < m_entry_count; i++) { ++ m_free_list.push_back(new Entry()); ++ } ++ ++ } ++ ++ ~SimplePolicy() { ++ for(uint64_t i = 0; i < m_entry_count; i++) { ++ Entry* entry = reinterpret_cast<Entry*>(m_free_list.front()); ++ delete entry; ++ m_free_list.pop_front(); ++ } ++ } ++ ++ CACHESTATUS lookup_object(std::string cache_file_name) { ++ ++ //TODO(): check race condition ++ RWLock::WLocker wlocker(m_cache_map_lock); ++ ++ auto entry_it = m_cache_map.find(cache_file_name); ++ if(entry_it == m_cache_map.end()) { ++ Mutex::Locker locker(m_free_list_lock); ++ Entry* entry = reinterpret_cast<Entry*>(m_free_list.front()); ++ assert(entry != nullptr); ++ m_free_list.pop_front(); ++ entry->status = OBJ_CACHE_PROMOTING; ++ ++ m_cache_map[cache_file_name] = entry; ++ ++ return OBJ_CACHE_NONE; ++ } ++ ++ Entry* entry = entry_it->second; ++ ++ if(entry->status == OBJ_CACHE_PROMOTED) { ++ // touch it ++ m_promoted_lru.lru_touch(entry); ++ } ++ ++ return entry->status; ++ } ++ ++ int evict_object(std::string& out_cache_file_name) { ++ RWLock::WLocker locker(m_cache_map_lock); ++ ++ return 1; ++ } ++ ++ // TODO(): simplify the logic ++ void update_status(std::string file_name, CACHESTATUS new_status) { ++ RWLock::WLocker locker(m_cache_map_lock); ++ ++ Entry* entry; ++ auto entry_it = m_cache_map.find(file_name); ++ ++ // just check. ++ if(new_status == OBJ_CACHE_PROMOTING) { ++ assert(entry_it == m_cache_map.end()); ++ } ++ ++ assert(entry_it != m_cache_map.end()); ++ ++ entry = entry_it->second; ++ ++ // promoting is done, so update it. ++ if(entry->status == OBJ_CACHE_PROMOTING && new_status== OBJ_CACHE_PROMOTED) { ++ m_promoted_lru.lru_insert_top(entry); ++ entry->status = new_status; ++ return; ++ } ++ ++ assert(0); ++ } ++ ++ // get entry status ++ CACHESTATUS get_status(std::string file_name) { ++ RWLock::RLocker locker(m_cache_map_lock); ++ auto entry_it = m_cache_map.find(file_name); ++ if(entry_it == m_cache_map.end()) { ++ return OBJ_CACHE_NONE; ++ } ++ ++ return entry_it->second->status; ++ } ++ ++ void get_evict_list(std::list<std::string>* obj_list) { ++ RWLock::WLocker locker(m_cache_map_lock); ++ // check free ratio, pop entries from LRU ++ if (m_free_list.size() / m_entry_count < m_watermark) { ++ int evict_num = 10; //TODO(): make this configurable ++ for(int i = 0; i < evict_num; i++) { ++ Entry* entry = reinterpret_cast<Entry*>(m_promoted_lru.lru_expire()); ++ if (entry == nullptr) { ++ continue; ++ } ++ std::string file_name = entry->cache_file_name; ++ obj_list->push_back(file_name); ++ ++ auto entry_it = m_cache_map.find(file_name); ++ m_cache_map.erase(entry_it); ++ ++ //mark this entry as free ++ entry->status = OBJ_CACHE_NONE; ++ Mutex::Locker locker(m_free_list_lock); ++ m_free_list.push_back(entry); ++ } ++ } ++ } ++ ++private: ++ ++ class Entry : public LRUObject { ++ public: ++ CACHESTATUS status; ++ Entry() : status(OBJ_CACHE_NONE){} ++ std::string cache_file_name; ++ void encode(bufferlist &bl){} ++ void decode(bufferlist::iterator &it){} ++ }; ++ ++ float m_watermark; ++ uint64_t m_entry_count; ++ ++ std::unordered_map<std::string, Entry*> m_cache_map; ++ RWLock m_cache_map_lock; ++ ++ std::deque<Entry*> m_free_list; ++ Mutex m_free_list_lock; ++ ++ LRU m_promoted_lru; // include promoted, using status. ++ ++}; ++ ++} // namespace immutable_obj_cache ++} // namespace ceph ++#endif +diff --git a/src/tools/ceph_immutable_object_cache/SocketCommon.h b/src/tools/ceph_immutable_object_cache/SocketCommon.h +new file mode 100644 +index 0000000..53dca54 +--- /dev/null ++++ b/src/tools/ceph_immutable_object_cache/SocketCommon.h +@@ -0,0 +1,54 @@ ++// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- ++// vim: ts=8 sw=2 smarttab ++ ++#ifndef CEPH_CACHE_SOCKET_COMMON_H ++#define CEPH_CACHE_SOCKET_COMMON_H ++ ++namespace ceph { ++namespace immutable_obj_cache { ++ ++static const int RBDSC_REGISTER = 0X11; ++static const int RBDSC_READ = 0X12; ++static const int RBDSC_LOOKUP = 0X13; ++static const int RBDSC_REGISTER_REPLY = 0X14; ++static const int RBDSC_READ_REPLY = 0X15; ++static const int RBDSC_LOOKUP_REPLY = 0X16; ++static const int RBDSC_READ_RADOS = 0X17; ++ ++ ++ ++typedef std::function<void(uint64_t, std::string)> ProcessMsg; ++typedef std::function<void(std::string)> ClientProcessMsg; ++typedef uint8_t rbdsc_req_type; ++ ++//TODO(): switch to bufferlist ++struct rbdsc_req_type_t { ++ rbdsc_req_type type; ++ uint64_t vol_size; ++ uint64_t offset; ++ uint64_t length; ++ char pool_name[256]; ++ char vol_name[256]; ++ ++ uint64_t size() { ++ return sizeof(rbdsc_req_type_t); ++ } ++ ++ std::string to_buffer() { ++ std::stringstream ss; ++ ss << type; ++ ss << vol_size; ++ ss << offset; ++ ss << length; ++ ss << pool_name; ++ ss << vol_name; ++ ++ return ss.str(); ++ } ++}; ++ ++static const int RBDSC_MSG_LEN = sizeof(rbdsc_req_type_t); ++ ++} // namespace immutable_obj_cache ++} // namespace ceph ++#endif +diff --git a/src/tools/ceph_immutable_object_cache/main.cc b/src/tools/ceph_immutable_object_cache/main.cc +new file mode 100644 +index 0000000..7a9131d +--- /dev/null ++++ b/src/tools/ceph_immutable_object_cache/main.cc +@@ -0,0 +1,85 @@ ++// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- ++// vim: ts=8 sw=2 smarttab ++ ++#include "common/ceph_argparse.h" ++#include "common/config.h" ++#include "common/debug.h" ++#include "common/errno.h" ++#include "global/global_init.h" ++#include "global/signal_handler.h" ++#include "CacheController.h" ++ ++#include <vector> ++ ++ceph::immutable_obj_cache::CacheController *cachectl = nullptr; ++ ++void usage() { ++ std::cout << "usage: cache controller [options...]" << std::endl; ++ std::cout << "options:\n"; ++ std::cout << " -m monaddress[:port] connect to specified monitor\n"; ++ std::cout << " --keyring=<path> path to keyring for local cluster\n"; ++ std::cout << " --log-file=<logfile> file to log debug output\n"; ++ std::cout << " --debug-rbd-cachecontroller=<log-level>/<memory-level> set rbd-mirror debug level\n"; ++ generic_server_usage(); ++} ++ ++static void handle_signal(int signum) ++{ ++ if (cachectl) ++ cachectl->handle_signal(signum); ++} ++ ++int main(int argc, const char **argv) ++{ ++ std::vector<const char*> args; ++ env_to_vec(args); ++ argv_to_vec(argc, argv, args); ++ ++ auto cct = global_init(nullptr, args, CEPH_ENTITY_TYPE_CLIENT, ++ CODE_ENVIRONMENT_DAEMON, ++ CINIT_FLAG_UNPRIVILEGED_DAEMON_DEFAULTS); ++ ++ for (auto i = args.begin(); i != args.end(); ++i) { ++ if (ceph_argparse_flag(args, i, "-h", "--help", (char*)NULL)) { ++ usage(); ++ return EXIT_SUCCESS; ++ } ++ } ++ ++ if (g_conf()->daemonize) { ++ global_init_daemonize(g_ceph_context); ++ } ++ g_ceph_context->enable_perf_counter(); ++ ++ common_init_finish(g_ceph_context); ++ ++ init_async_signal_handler(); ++ register_async_signal_handler(SIGHUP, sighup_handler); ++ register_async_signal_handler_oneshot(SIGINT, handle_signal); ++ register_async_signal_handler_oneshot(SIGTERM, handle_signal); ++ ++ std::vector<const char*> cmd_args; ++ argv_to_vec(argc, argv, cmd_args); ++ ++ // disable unnecessary librbd cache ++ g_ceph_context->_conf.set_val_or_die("rbd_cache", "false"); ++ ++ cachectl = new ceph::immutable_obj_cache::CacheController(g_ceph_context, cmd_args); ++ int r = cachectl->init(); ++ if (r < 0) { ++ std::cerr << "failed to initialize: " << cpp_strerror(r) << std::endl; ++ goto cleanup; ++ } ++ ++ cachectl->run(); ++ ++ cleanup: ++ unregister_async_signal_handler(SIGHUP, sighup_handler); ++ unregister_async_signal_handler(SIGINT, handle_signal); ++ unregister_async_signal_handler(SIGTERM, handle_signal); ++ shutdown_async_signal_handler(); ++ ++ delete cachectl; ++ ++ return r < 0 ? EXIT_SUCCESS : EXIT_FAILURE; ++} +diff --git a/src/tools/rbd_cache/CMakeLists.txt b/src/tools/rbd_cache/CMakeLists.txt +deleted file mode 100644 +index 597d802..0000000 +--- a/src/tools/rbd_cache/CMakeLists.txt ++++ /dev/null +@@ -1,9 +0,0 @@ +-add_executable(rbd-cache +- ${CMAKE_SOURCE_DIR}/src/librbd/cache/SharedPersistentObjectCacherFile.cc +- ObjectCacheStore.cc +- CacheController.cc +- main.cc) +-target_link_libraries(rbd-cache +- librados +- global) +-install(TARGETS rbd-cache DESTINATION bin) +diff --git a/src/tools/rbd_cache/CacheController.cc b/src/tools/rbd_cache/CacheController.cc +deleted file mode 100644 +index 620192c..0000000 +--- a/src/tools/rbd_cache/CacheController.cc ++++ /dev/null +@@ -1,116 +0,0 @@ +-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +-// vim: ts=8 sw=2 smarttab +- +-#include "CacheController.h" +- +-#define dout_context g_ceph_context +-#define dout_subsys ceph_subsys_rbd_cache +-#undef dout_prefix +-#define dout_prefix *_dout << "rbd::cache::CacheController: " << this << " " \ +- << __func__ << ": " +- +-namespace rbd { +-namespace cache { +- +-class ThreadPoolSingleton : public ThreadPool { +-public: +- ContextWQ *op_work_queue; +- +- explicit ThreadPoolSingleton(CephContext *cct) +- : ThreadPool(cct, "librbd::cache::thread_pool", "tp_librbd_cache", 32, +- "pcache_threads"), +- op_work_queue(new ContextWQ("librbd::pcache_op_work_queue", +- cct->_conf.get_val<int64_t>("rbd_op_thread_timeout"), +- this)) { +- start(); +- } +- ~ThreadPoolSingleton() override { +- op_work_queue->drain(); +- delete op_work_queue; +- +- stop(); +- } +-}; +- +- +-CacheController::CacheController(CephContext *cct, const std::vector<const char*> &args): +- m_args(args), m_cct(cct) { +- +-} +- +-CacheController::~CacheController() { +- +-} +- +-int CacheController::init() { +- ThreadPoolSingleton* thread_pool_singleton = &m_cct->lookup_or_create_singleton_object<ThreadPoolSingleton>( +- "rbd::cache::thread_pool", false, m_cct); +- pcache_op_work_queue = thread_pool_singleton->op_work_queue; +- +- m_object_cache_store = new ObjectCacheStore(m_cct, pcache_op_work_queue); +- int r = m_object_cache_store->init(false); +- if (r < 0) { +- //derr << "init error\n" << dendl; +- } +- return r; +-} +- +-int CacheController::shutdown() { +- int r = m_object_cache_store->shutdown(); +- return r; +-} +- +-void CacheController::handle_signal(int signum){} +- +-void CacheController::run() { +- try { +- //TODO(): use new socket path +- std::string controller_path = m_cct->_conf.get_val<std::string>("rbd_shared_cache_sock"); +- std::remove(controller_path.c_str()); +- +- m_cache_server = new CacheServer(controller_path, +- ([&](uint64_t p, std::string s){handle_request(p, s);}), m_cct); +- m_cache_server->run(); +- } catch (std::exception& e) { +- std::cerr << "Exception: " << e.what() << "\n"; +- } +-} +- +-void CacheController::handle_request(uint64_t session_id, std::string msg){ +- rbdsc_req_type_t *io_ctx = (rbdsc_req_type_t*)(msg.c_str()); +- +- int ret = 0; +- +- switch (io_ctx->type) { +- case RBDSC_REGISTER: { +- // init cache layout for volume +- m_object_cache_store->init_cache(io_ctx->vol_name, io_ctx->vol_size); +- io_ctx->type = RBDSC_REGISTER_REPLY; +- m_cache_server->send(session_id, std::string((char*)io_ctx, msg.size())); +- +- break; +- } +- case RBDSC_READ: { +- // lookup object in local cache store +- ret = m_object_cache_store->lookup_object(io_ctx->pool_name, io_ctx->vol_name); +- if (ret < 0) { +- io_ctx->type = RBDSC_READ_RADOS; +- } else { +- io_ctx->type = RBDSC_READ_REPLY; +- } +- if (io_ctx->type != RBDSC_READ_REPLY) { +- assert(0); +- } +- m_cache_server->send(session_id, std::string((char*)io_ctx, msg.size())); +- +- break; +- } +- std::cout<<"can't recongize request"<<std::endl; +- assert(0); // TODO replace it. +- } +-} +- +-} // namespace rbd +-} // namespace cache +- +- +diff --git a/src/tools/rbd_cache/CacheController.h b/src/tools/rbd_cache/CacheController.h +deleted file mode 100644 +index 0e23484..0000000 +--- a/src/tools/rbd_cache/CacheController.h ++++ /dev/null +@@ -1,54 +0,0 @@ +-#ifndef CACHE_CONTROLLER_H +-#define CACHE_CONTROLLER_H +- +-#include <thread> +- +-#include "common/Formatter.h" +-#include "common/admin_socket.h" +-#include "common/debug.h" +-#include "common/errno.h" +-#include "common/ceph_context.h" +-#include "common/Mutex.h" +-#include "common/WorkQueue.h" +-#include "include/rados/librados.hpp" +-#include "include/rbd/librbd.h" +-#include "include/assert.h" +-#include "librbd/ImageCtx.h" +-#include "librbd/ImageState.h" +- +-#include "CacheControllerSocket.hpp" +-#include "ObjectCacheStore.h" +- +- +-using boost::asio::local::stream_protocol; +- +-namespace rbd { +-namespace cache { +- +-class CacheController { +- public: +- CacheController(CephContext *cct, const std::vector<const char*> &args); +- ~CacheController(); +- +- int init(); +- +- int shutdown(); +- +- void handle_signal(int sinnum); +- +- void run(); +- +- void handle_request(uint64_t sesstion_id, std::string msg); +- +- private: +- CacheServer *m_cache_server; +- std::vector<const char*> m_args; +- CephContext *m_cct; +- ObjectCacheStore *m_object_cache_store; +- ContextWQ* pcache_op_work_queue; +-}; +- +-} // namespace rbd +-} // namespace cache +- +-#endif +diff --git a/src/tools/rbd_cache/CacheControllerSocket.hpp b/src/tools/rbd_cache/CacheControllerSocket.hpp +deleted file mode 100644 +index 2ff7477..0000000 +--- a/src/tools/rbd_cache/CacheControllerSocket.hpp ++++ /dev/null +@@ -1,228 +0,0 @@ +-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +-// vim: ts=8 sw=2 smarttab +- +-#ifndef CACHE_CONTROLLER_SOCKET_H +-#define CACHE_CONTROLLER_SOCKET_H +- +-#include <cstdio> +-#include <iostream> +-#include <array> +-#include <memory> +-#include <string> +-#include <boost/bind.hpp> +-#include <boost/asio.hpp> +-#include <boost/asio/error.hpp> +-#include <boost/algorithm/string.hpp> +-#include "CacheControllerSocketCommon.h" +- +- +-using boost::asio::local::stream_protocol; +- +-namespace rbd { +-namespace cache { +- +-class session : public std::enable_shared_from_this<session> { +-public: +- session(uint64_t session_id, boost::asio::io_service& io_service, ProcessMsg processmsg) +- : m_session_id(session_id), m_dm_socket(io_service), process_msg(processmsg) {} +- +- stream_protocol::socket& socket() { +- return m_dm_socket; +- } +- +- void start() { +- if(true) { +- serial_handing_request(); +- } else { +- parallel_handing_request(); +- } +- } +- // flow: +- // +- // recv request --> process request --> reply ack +- // | | +- // --------------<------------------------- +- void serial_handing_request() { +- boost::asio::async_read(m_dm_socket, boost::asio::buffer(m_buffer, RBDSC_MSG_LEN), +- boost::asio::transfer_exactly(RBDSC_MSG_LEN), +- boost::bind(&session::handle_read, +- shared_from_this(), +- boost::asio::placeholders::error, +- boost::asio::placeholders::bytes_transferred)); +- } +- +- // flow : +- // +- // --> thread 1: process request +- // recv request --> thread 2: process request --> reply ack +- // --> thread n: process request +- // +- void parallel_handing_request() { +- // TODO +- } +- +-private: +- +- void handle_read(const boost::system::error_code& error, size_t bytes_transferred) { +- // when recv eof, the most proble is that client side close socket. +- // so, server side need to end handing_request +- if(error == boost::asio::error::eof) { +- std::cout<<"session: async_read : " << error.message() << std::endl; +- return; +- } +- +- if(error) { +- std::cout<<"session: async_read fails: " << error.message() << std::endl; +- assert(0); +- } +- +- if(bytes_transferred != RBDSC_MSG_LEN) { +- std::cout<<"session : request in-complete. "<<std::endl; +- assert(0); +- } +- +- // TODO async_process can increse coding readable. +- // process_msg_callback call handle async_send +- process_msg(m_session_id, std::string(m_buffer, bytes_transferred)); +- } +- +- void handle_write(const boost::system::error_code& error, size_t bytes_transferred) { +- if (error) { +- std::cout<<"session: async_write fails: " << error.message() << std::endl; +- assert(0); +- } +- +- if(bytes_transferred != RBDSC_MSG_LEN) { +- std::cout<<"session : reply in-complete. "<<std::endl; +- assert(0); +- } +- +- boost::asio::async_read(m_dm_socket, boost::asio::buffer(m_buffer), +- boost::asio::transfer_exactly(RBDSC_MSG_LEN), +- boost::bind(&session::handle_read, +- shared_from_this(), +- boost::asio::placeholders::error, +- boost::asio::placeholders::bytes_transferred)); +- +- } +- +-public: +- void send(std::string msg) { +- boost::asio::async_write(m_dm_socket, +- boost::asio::buffer(msg.c_str(), msg.size()), +- boost::asio::transfer_exactly(RBDSC_MSG_LEN), +- boost::bind(&session::handle_write, +- shared_from_this(), +- boost::asio::placeholders::error, +- boost::asio::placeholders::bytes_transferred)); +- +- } +- +-private: +- uint64_t m_session_id; +- stream_protocol::socket m_dm_socket; +- ProcessMsg process_msg; +- +- // Buffer used to store data received from the client. +- //std::array<char, 1024> data_; +- char m_buffer[1024]; +-}; +- +-typedef std::shared_ptr<session> session_ptr; +- +-class CacheServer { +-public: +- CacheServer(const std::string& file, ProcessMsg processmsg, CephContext* cct) +- : m_cct(cct), m_server_process_msg(processmsg), +- m_local_path(file), +- m_acceptor(m_io_service) +- {} +- +- void run() { +- bool ret; +- ret = start_accept(); +- if(!ret) { +- return; +- } +- m_io_service.run(); +- } +- +- // TODO : use callback to replace this function. +- void send(uint64_t session_id, std::string msg) { +- auto it = m_session_map.find(session_id); +- if (it != m_session_map.end()) { +- it->second->send(msg); +- } else { +- // TODO : why don't find existing session id ? +- std::cout<<"don't find session id..."<<std::endl; +- assert(0); +- } +- } +- +-private: +- // when creating one acceptor, can control every step in this way. +- bool start_accept() { +- boost::system::error_code ec; +- m_acceptor.open(m_local_path.protocol(), ec); +- if(ec) { +- std::cout << "m_acceptor open fails: " << ec.message() << std::endl; +- return false; +- } +- +- // TODO control acceptor attribute. +- +- m_acceptor.bind(m_local_path, ec); +- if(ec) { +- std::cout << "m_acceptor bind fails: " << ec.message() << std::endl; +- return false; +- } +- +- m_acceptor.listen(boost::asio::socket_base::max_connections, ec); +- if(ec) { +- std::cout << "m_acceptor listen fails: " << ec.message() << std::endl; +- return false; +- } +- +- accept(); +- return true; +- } +- +- void accept() { +- session_ptr new_session(new session(m_session_id, m_io_service, m_server_process_msg)); +- m_acceptor.async_accept(new_session->socket(), +- boost::bind(&CacheServer::handle_accept, this, new_session, +- boost::asio::placeholders::error)); +- } +- +- void handle_accept(session_ptr new_session, const boost::system::error_code& error) { +- //TODO(): open librbd snap ... yuan +- +- if(error) { +- std::cout << "async accept fails : " << error.message() << std::endl; +- assert(0); // TODO +- } +- +- // must put session into m_session_map at the front of session.start() +- m_session_map.emplace(m_session_id, new_session); +- // TODO : session setting +- new_session->start(); +- m_session_id++; +- +- // lanuch next accept +- accept(); +- } +- +-private: +- CephContext* m_cct; +- boost::asio::io_service m_io_service; // TODO wrapper it. +- ProcessMsg m_server_process_msg; +- stream_protocol::endpoint m_local_path; +- stream_protocol::acceptor m_acceptor; +- uint64_t m_session_id = 1; +- std::map<uint64_t, session_ptr> m_session_map; +-}; +- +-} // namespace cache +-} // namespace rbd +- +-#endif +diff --git a/src/tools/rbd_cache/CacheControllerSocketClient.hpp b/src/tools/rbd_cache/CacheControllerSocketClient.hpp +deleted file mode 100644 +index 964f888..0000000 +--- a/src/tools/rbd_cache/CacheControllerSocketClient.hpp ++++ /dev/null +@@ -1,229 +0,0 @@ +-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +-// vim: ts=8 sw=2 smarttab +- +-#ifndef CACHE_CONTROLLER_SOCKET_CLIENT_H +-#define CACHE_CONTROLLER_SOCKET_CLIENT_H +- +-#include <atomic> +-#include <boost/asio.hpp> +-#include <boost/bind.hpp> +-#include <boost/asio/error.hpp> +-#include <boost/algorithm/string.hpp> +-#include "librbd/ImageCtx.h" +-#include "include/assert.h" +-#include "include/Context.h" +-#include "CacheControllerSocketCommon.h" +- +- +-using boost::asio::local::stream_protocol; +- +-namespace rbd { +-namespace cache { +- +-class CacheClient { +-public: +- CacheClient(const std::string& file, ClientProcessMsg processmsg, CephContext* ceph_ctx) +- : m_io_service_work(m_io_service), +- m_dm_socket(m_io_service), +- m_client_process_msg(processmsg), +- m_ep(stream_protocol::endpoint(file)), +- m_session_work(false), +- cct(ceph_ctx) +- { +- // TODO wrapper io_service +- std::thread thd([this](){ +- m_io_service.run();}); +- thd.detach(); +- } +- +- void run(){ +- } +- +- bool is_session_work() { +- return m_session_work.load() == true; +- } +- +- // just when error occur, call this method. +- void close() { +- m_session_work.store(false); +- boost::system::error_code close_ec; +- m_dm_socket.close(close_ec); +- if(close_ec) { +- std::cout << "close: " << close_ec.message() << std::endl; +- } +- std::cout << "session don't work, later all request will be dispatched to rados layer" << std::endl; +- } +- +- int connect() { +- boost::system::error_code ec; +- m_dm_socket.connect(m_ep, ec); +- if(ec) { +- if(ec == boost::asio::error::connection_refused) { +- std::cout << ec.message() << " : maybe rbd-cache Controller don't startup. " +- << "Now data will be read from ceph cluster " << std::endl; +- } else { +- std::cout << "connect: " << ec.message() << std::endl; +- } +- +- if(m_dm_socket.is_open()) { +- // Set to indicate what error occurred, if any. +- // Note that, even if the function indicates an error, +- // the underlying descriptor is closed. +- boost::system::error_code close_ec; +- m_dm_socket.close(close_ec); +- if(close_ec) { +- std::cout << "close: " << close_ec.message() << std::endl; +- } +- } +- return -1; +- } +- +- std::cout<<"connect success"<<std::endl; +- +- return 0; +- } +- +- int register_volume(std::string pool_name, std::string vol_name, uint64_t vol_size) { +- // cache controller will init layout +- rbdsc_req_type_t *message = new rbdsc_req_type_t(); +- message->type = RBDSC_REGISTER; +- memcpy(message->pool_name, pool_name.c_str(), pool_name.size()); +- memcpy(message->vol_name, vol_name.c_str(), vol_name.size()); +- message->vol_size = vol_size; +- message->offset = 0; +- message->length = 0; +- +- uint64_t ret; +- boost::system::error_code ec; +- +- ret = boost::asio::write(m_dm_socket, boost::asio::buffer((char*)message, message->size()), ec); +- if(ec) { +- std::cout << "write fails : " << ec.message() << std::endl; +- return -1; +- } +- +- if(ret != message->size()) { +- std::cout << "write fails : ret != send_bytes "<< std::endl; +- return -1; +- } +- +- // hard code TODO +- ret = boost::asio::read(m_dm_socket, boost::asio::buffer(m_recv_buffer, RBDSC_MSG_LEN), ec); +- if(ec == boost::asio::error::eof) { +- std::cout<< "recv eof"<<std::endl; +- return -1; +- } +- +- if(ec) { +- std::cout << "write fails : " << ec.message() << std::endl; +- return -1; +- } +- +- if(ret != RBDSC_MSG_LEN) { +- std::cout << "write fails : ret != receive bytes " << std::endl; +- return -1; +- } +- +- m_client_process_msg(std::string(m_recv_buffer, ret)); +- +- delete message; +- +- std::cout << "register volume success" << std::endl; +- +- // TODO +- m_session_work.store(true); +- +- return 0; +- } +- +- // if occur any error, we just return false. Then read from rados. +- int lookup_object(std::string pool_name, std::string vol_name, std::string object_id, Context* on_finish) { +- rbdsc_req_type_t *message = new rbdsc_req_type_t(); +- message->type = RBDSC_READ; +- memcpy(message->pool_name, pool_name.c_str(), pool_name.size()); +- memcpy(message->vol_name, object_id.c_str(), object_id.size()); +- message->vol_size = 0; +- message->offset = 0; +- message->length = 0; +- +- boost::asio::async_write(m_dm_socket, +- boost::asio::buffer((char*)message, message->size()), +- boost::asio::transfer_exactly(RBDSC_MSG_LEN), +- [this, on_finish, message](const boost::system::error_code& err, size_t cb) { +- delete message; +- if(err) { +- std::cout<< "lookup_object: async_write fails." << err.message() << std::endl; +- close(); +- on_finish->complete(false); +- return; +- } +- if(cb != RBDSC_MSG_LEN) { +- std::cout<< "lookup_object: async_write fails. in-complete request" <<std::endl; +- close(); +- on_finish->complete(false); +- return; +- } +- get_result(on_finish); +- }); +- +- return 0; +- } +- +- void get_result(Context* on_finish) { +- boost::asio::async_read(m_dm_socket, boost::asio::buffer(m_recv_buffer, RBDSC_MSG_LEN), +- boost::asio::transfer_exactly(RBDSC_MSG_LEN), +- [this, on_finish](const boost::system::error_code& err, size_t cb) { +- if(err == boost::asio::error::eof) { +- std::cout<<"get_result: ack is EOF." << std::endl; +- close(); +- on_finish->complete(false); +- return; +- } +- if(err) { +- std::cout<< "get_result: async_read fails:" << err.message() << std::endl; +- close(); +- on_finish->complete(false); // TODO replace this assert with some metohds. +- return; +- } +- if (cb != RBDSC_MSG_LEN) { +- close(); +- std::cout << "get_result: in-complete ack." << std::endl; +- on_finish->complete(false); // TODO: replace this assert with some methods. +- } +- +- rbdsc_req_type_t *io_ctx = (rbdsc_req_type_t*)(m_recv_buffer); +- +- // TODO: re-occur yuan's bug +- if(io_ctx->type == RBDSC_READ) { +- std::cout << "get rbdsc_read... " << std::endl; +- assert(0); +- } +- +- if (io_ctx->type == RBDSC_READ_REPLY) { +- on_finish->complete(true); +- return; +- } else { +- on_finish->complete(false); +- return; +- } +- }); +- } +- +-private: +- boost::asio::io_service m_io_service; +- boost::asio::io_service::work m_io_service_work; +- stream_protocol::socket m_dm_socket; +- ClientProcessMsg m_client_process_msg; +- stream_protocol::endpoint m_ep; +- char m_recv_buffer[1024]; +- +- // atomic modfiy for this variable. +- // thread 1 : asio callback thread modify it. +- // thread 2 : librbd read it. +- std::atomic<bool> m_session_work; +- CephContext* cct; +-}; +- +-} // namespace cache +-} // namespace rbd +-#endif +diff --git a/src/tools/rbd_cache/CacheControllerSocketCommon.h b/src/tools/rbd_cache/CacheControllerSocketCommon.h +deleted file mode 100644 +index e17529a..0000000 +--- a/src/tools/rbd_cache/CacheControllerSocketCommon.h ++++ /dev/null +@@ -1,62 +0,0 @@ +-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +-// vim: ts=8 sw=2 smarttab +- +-#ifndef CACHE_CONTROLLER_SOCKET_COMMON_H +-#define CACHE_CONTROLLER_SOCKET_COMMON_H +- +-/* +-#define RBDSC_REGISTER 0X11 +-#define RBDSC_READ 0X12 +-#define RBDSC_LOOKUP 0X13 +-#define RBDSC_REGISTER_REPLY 0X14 +-#define RBDSC_READ_REPLY 0X15 +-#define RBDSC_LOOKUP_REPLY 0X16 +-#define RBDSC_READ_RADOS 0X17 +-*/ +- +-namespace rbd { +-namespace cache { +- +-static const int RBDSC_REGISTER = 0X11; +-static const int RBDSC_READ = 0X12; +-static const int RBDSC_LOOKUP = 0X13; +-static const int RBDSC_REGISTER_REPLY = 0X14; +-static const int RBDSC_READ_REPLY = 0X15; +-static const int RBDSC_LOOKUP_REPLY = 0X16; +-static const int RBDSC_READ_RADOS = 0X17; +- +- +- +-typedef std::function<void(uint64_t, std::string)> ProcessMsg; +-typedef std::function<void(std::string)> ClientProcessMsg; +-typedef uint8_t rbdsc_req_type; +-struct rbdsc_req_type_t { +- rbdsc_req_type type; +- uint64_t vol_size; +- uint64_t offset; +- uint64_t length; +- char pool_name[256]; +- char vol_name[256]; +- +- uint64_t size() { +- return sizeof(rbdsc_req_type_t); +- } +- +- std::string to_buffer() { +- std::stringstream ss; +- ss << type; +- ss << vol_size; +- ss << offset; +- ss << length; +- ss << pool_name; +- ss << vol_name; +- +- return ss.str(); +- } +-}; +- +-static const int RBDSC_MSG_LEN = sizeof(rbdsc_req_type_t); +- +-} // namespace cache +-} // namespace rbd +-#endif +diff --git a/src/tools/rbd_cache/ObjectCacheStore.cc b/src/tools/rbd_cache/ObjectCacheStore.cc +deleted file mode 100644 +index 99f90d6..0000000 +--- a/src/tools/rbd_cache/ObjectCacheStore.cc ++++ /dev/null +@@ -1,172 +0,0 @@ +-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +-// vim: ts=8 sw=2 smarttab +- +-#include "ObjectCacheStore.h" +- +-#define dout_context g_ceph_context +-#define dout_subsys ceph_subsys_rbd_cache +-#undef dout_prefix +-#define dout_prefix *_dout << "rbd::cache::ObjectCacheStore: " << this << " " \ +- << __func__ << ": " +- +-namespace rbd { +-namespace cache { +- +-ObjectCacheStore::ObjectCacheStore(CephContext *cct, ContextWQ* work_queue) +- : m_cct(cct), m_work_queue(work_queue), +- m_rados(new librados::Rados()) { +- +- uint64_t object_cache_entries = +- cct->_conf.get_val<int64_t>("rbd_shared_cache_entries"); +- +- //TODO(): allow to set level +- m_policy = new SimplePolicy(object_cache_entries, 0.5); +-} +- +-ObjectCacheStore::~ObjectCacheStore() { +- delete m_policy; +-} +- +-int ObjectCacheStore::init(bool reset) { +- +- int ret = m_rados->init_with_context(m_cct); +- if(ret < 0) { +- lderr(m_cct) << "fail to init Ceph context" << dendl; +- return ret; +- } +- +- ret = m_rados->connect(); +- if(ret < 0 ) { +- lderr(m_cct) << "fail to conect to cluster" << dendl; +- return ret; +- } +- +- std::string cache_path = m_cct->_conf.get_val<std::string>("rbd_shared_cache_path"); +- //TODO(): check and reuse existing cache objects +- if(reset) { +- std::string cmd = "exec rm -rf " + cache_path + "/rbd_cache*; exec mkdir -p " + cache_path; +- //TODO(): to use std::filesystem +- int r = system(cmd.c_str()); +- } +- +- evict_thd = new std::thread([this]{this->evict_thread_body();}); +- return ret; +-} +- +-int ObjectCacheStore::do_promote(std::string pool_name, std::string object_name) { +- int ret = 0; +- std::string cache_file_name = pool_name + object_name; +- +- //TODO(): lock on ioctx map +- if (m_ioctxs.find(pool_name) == m_ioctxs.end()) { +- librados::IoCtx* io_ctx = new librados::IoCtx(); +- ret = m_rados->ioctx_create(pool_name.c_str(), *io_ctx); +- if (ret < 0) { +- lderr(m_cct) << "fail to create ioctx" << dendl; +- assert(0); +- } +- m_ioctxs.emplace(pool_name, io_ctx); +- } +- +- assert(m_ioctxs.find(pool_name) != m_ioctxs.end()); +- +- librados::IoCtx* ioctx = m_ioctxs[pool_name]; +- +- librados::bufferlist* read_buf = new librados::bufferlist(); +- int object_size = 4096*1024; //TODO(): read config from image metadata +- +- //TODO(): async promote +- ret = promote_object(ioctx, object_name, read_buf, object_size); +- if (ret == -ENOENT) { +- read_buf->append(std::string(object_size, '0')); +- ret = 0; +- } +- +- if( ret < 0) { +- lderr(m_cct) << "fail to read from rados" << dendl; +- return ret; +- } +- +- // persistent to cache +- librbd::cache::SyncFile cache_file(m_cct, cache_file_name); +- cache_file.open(); +- ret = cache_file.write_object_to_file(*read_buf, object_size); +- +- // update metadata +- assert(OBJ_CACHE_PROMOTING == m_policy->get_status(cache_file_name)); +- m_policy->update_status(cache_file_name, OBJ_CACHE_PROMOTED); +- assert(OBJ_CACHE_PROMOTED == m_policy->get_status(cache_file_name)); +- +- return ret; +- +-} +- +-// return -1, client need to read data from cluster. +-// return 0, client directly read data from cache. +-int ObjectCacheStore::lookup_object(std::string pool_name, std::string object_name) { +- +- std::string cache_file_name = pool_name + object_name; +- +- CACHESTATUS ret; +- ret = m_policy->lookup_object(cache_file_name); +- +- switch(ret) { +- case OBJ_CACHE_NONE: +- return do_promote(pool_name, object_name); +- case OBJ_CACHE_PROMOTED: +- return 0; +- case OBJ_CACHE_PROMOTING: +- default: +- return -1; +- } +-} +- +-void ObjectCacheStore::evict_thread_body() { +- int ret; +- while(m_evict_go) { +- ret = evict_objects(); +- } +-} +- +- +-int ObjectCacheStore::shutdown() { +- m_evict_go = false; +- evict_thd->join(); +- m_rados->shutdown(); +- return 0; +-} +- +-int ObjectCacheStore::init_cache(std::string vol_name, uint64_t vol_size) { +- return 0; +-} +- +-int ObjectCacheStore::lock_cache(std::string vol_name) { +- return 0; +-} +- +-int ObjectCacheStore::promote_object(librados::IoCtx* ioctx, std::string object_name, librados::bufferlist* read_buf, uint64_t read_len) { +- int ret; +- +- librados::AioCompletion* read_completion = librados::Rados::aio_create_completion(); +- +- ret = ioctx->aio_read(object_name, read_completion, read_buf, read_len, 0); +- if(ret < 0) { +- lderr(m_cct) << "fail to read from rados" << dendl; +- return ret; +- } +- read_completion->wait_for_complete(); +- ret = read_completion->get_return_value(); +- return ret; +- +-} +- +-int ObjectCacheStore::evict_objects() { +- std::list<std::string> obj_list; +- m_policy->get_evict_list(&obj_list); +- for (auto& obj: obj_list) { +- //do_evict(obj); +- } +-} +- +-} // namespace cache +-} // namespace rbd +diff --git a/src/tools/rbd_cache/ObjectCacheStore.h b/src/tools/rbd_cache/ObjectCacheStore.h +deleted file mode 100644 +index ba0e1f1..0000000 +--- a/src/tools/rbd_cache/ObjectCacheStore.h ++++ /dev/null +@@ -1,70 +0,0 @@ +-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +-// vim: ts=8 sw=2 smarttab +- +-#ifndef OBJECT_CACHE_STORE_H +-#define OBJECT_CACHE_STORE_H +- +-#include "common/debug.h" +-#include "common/errno.h" +-#include "common/ceph_context.h" +-#include "common/Mutex.h" +-#include "include/rados/librados.hpp" +-#include "include/rbd/librbd.h" +-#include "librbd/ImageCtx.h" +-#include "librbd/ImageState.h" +-#include "librbd/cache/SharedPersistentObjectCacherFile.h" +-#include "SimplePolicy.hpp" +- +- +-using librados::Rados; +-using librados::IoCtx; +- +-namespace rbd { +-namespace cache { +- +-typedef shared_ptr<librados::Rados> RadosRef; +-typedef shared_ptr<librados::IoCtx> IoCtxRef; +- +-class ObjectCacheStore +-{ +- public: +- ObjectCacheStore(CephContext *cct, ContextWQ* work_queue); +- ~ObjectCacheStore(); +- +- int init(bool reset); +- +- int shutdown(); +- +- int lookup_object(std::string pool_name, std::string object_name); +- +- int init_cache(std::string vol_name, uint64_t vol_size); +- +- int lock_cache(std::string vol_name); +- +- private: +- void evict_thread_body(); +- int evict_objects(); +- +- int do_promote(std::string pool_name, std::string object_name); +- +- int promote_object(librados::IoCtx*, std::string object_name, +- librados::bufferlist* read_buf, +- uint64_t length); +- +- CephContext *m_cct; +- ContextWQ* m_work_queue; +- RadosRef m_rados; +- +- +- std::map<std::string, librados::IoCtx*> m_ioctxs; +- +- librbd::cache::SyncFile *m_cache_file; +- +- Policy* m_policy; +- std::thread* evict_thd; +- bool m_evict_go = false; +-}; +- +-} // namespace rbd +-} // namespace cache +-#endif +diff --git a/src/tools/rbd_cache/Policy.hpp b/src/tools/rbd_cache/Policy.hpp +deleted file mode 100644 +index 711e3bd..0000000 +--- a/src/tools/rbd_cache/Policy.hpp ++++ /dev/null +@@ -1,30 +0,0 @@ +-#ifndef RBD_CACHE_POLICY_HPP +-#define RBD_CACHE_POLICY_HPP +- +-#include <list> +-#include <string> +- +-namespace rbd { +-namespace cache { +- +-enum CACHESTATUS { +- OBJ_CACHE_NONE = 0, +- OBJ_CACHE_PROMOTING, +- OBJ_CACHE_PROMOTED, +-}; +- +- +-class Policy { +-public: +- Policy(){} +- virtual ~Policy(){}; +- virtual CACHESTATUS lookup_object(std::string) = 0; +- virtual int evict_object(std::string&) = 0; +- virtual void update_status(std::string, CACHESTATUS) = 0; +- virtual CACHESTATUS get_status(std::string) = 0; +- virtual void get_evict_list(std::list<std::string>* obj_list) = 0; +-}; +- +-} // namespace cache +-} // namespace rbd +-#endif +diff --git a/src/tools/rbd_cache/SimplePolicy.hpp b/src/tools/rbd_cache/SimplePolicy.hpp +deleted file mode 100644 +index e785de1..0000000 +--- a/src/tools/rbd_cache/SimplePolicy.hpp ++++ /dev/null +@@ -1,160 +0,0 @@ +-#ifndef RBD_CACHE_SIMPLE_POLICY_HPP +-#define RBD_CACHE_SIMPLE_POLICY_HPP +- +-#include "Policy.hpp" +-#include "include/lru.h" +-#include "common/RWLock.h" +-#include "common/Mutex.h" +- +-#include <vector> +-#include <unordered_map> +-#include <string> +- +-namespace rbd { +-namespace cache { +- +- +-class SimplePolicy : public Policy { +-public: +- SimplePolicy(uint64_t block_num, float watermark) +- : m_watermark(watermark), m_entry_count(block_num), +- m_cache_map_lock("rbd::cache::SimplePolicy::m_cache_map_lock"), +- m_free_list_lock("rbd::cache::SimplePolicy::m_free_list_lock") +- { +- +- for(uint64_t i = 0; i < m_entry_count; i++) { +- m_free_list.push_back(new Entry()); +- } +- +- } +- +- ~SimplePolicy() { +- for(uint64_t i = 0; i < m_entry_count; i++) { +- Entry* entry = reinterpret_cast<Entry*>(m_free_list.front()); +- delete entry; +- m_free_list.pop_front(); +- } +- } +- +- CACHESTATUS lookup_object(std::string cache_file_name) { +- +- //TODO(): check race condition +- RWLock::WLocker wlocker(m_cache_map_lock); +- +- auto entry_it = m_cache_map.find(cache_file_name); +- if(entry_it == m_cache_map.end()) { +- Mutex::Locker locker(m_free_list_lock); +- Entry* entry = reinterpret_cast<Entry*>(m_free_list.front()); +- assert(entry != nullptr); +- m_free_list.pop_front(); +- entry->status = OBJ_CACHE_PROMOTING; +- +- m_cache_map[cache_file_name] = entry; +- +- return OBJ_CACHE_NONE; +- } +- +- Entry* entry = entry_it->second; +- +- if(entry->status == OBJ_CACHE_PROMOTED) { +- // touch it +- m_promoted_lru.lru_touch(entry); +- } +- +- return entry->status; +- } +- +- int evict_object(std::string& out_cache_file_name) { +- RWLock::WLocker locker(m_cache_map_lock); +- +- return 1; +- } +- +- // TODO(): simplify the logic +- void update_status(std::string file_name, CACHESTATUS new_status) { +- RWLock::WLocker locker(m_cache_map_lock); +- +- Entry* entry; +- auto entry_it = m_cache_map.find(file_name); +- +- // just check. +- if(new_status == OBJ_CACHE_PROMOTING) { +- assert(entry_it == m_cache_map.end()); +- } +- +- assert(entry_it != m_cache_map.end()); +- +- entry = entry_it->second; +- +- // promoting is done, so update it. +- if(entry->status == OBJ_CACHE_PROMOTING && new_status== OBJ_CACHE_PROMOTED) { +- m_promoted_lru.lru_insert_top(entry); +- entry->status = new_status; +- return; +- } +- +- assert(0); +- } +- +- // get entry status +- CACHESTATUS get_status(std::string file_name) { +- RWLock::RLocker locker(m_cache_map_lock); +- auto entry_it = m_cache_map.find(file_name); +- if(entry_it == m_cache_map.end()) { +- return OBJ_CACHE_NONE; +- } +- +- return entry_it->second->status; +- } +- +- void get_evict_list(std::list<std::string>* obj_list) { +- RWLock::WLocker locker(m_cache_map_lock); +- // check free ratio, pop entries from LRU +- if (m_free_list.size() / m_entry_count < m_watermark) { +- int evict_num = 10; //TODO(): make this configurable +- for(int i = 0; i < evict_num; i++) { +- Entry* entry = reinterpret_cast<Entry*>(m_promoted_lru.lru_expire()); +- if (entry == nullptr) { +- continue; +- } +- std::string file_name = entry->cache_file_name; +- obj_list->push_back(file_name); +- +- auto entry_it = m_cache_map.find(file_name); +- m_cache_map.erase(entry_it); +- +- //mark this entry as free +- entry->status = OBJ_CACHE_NONE; +- Mutex::Locker locker(m_free_list_lock); +- m_free_list.push_back(entry); +- } +- } +- } +- +-private: +- +- class Entry : public LRUObject { +- public: +- CACHESTATUS status; +- Entry() : status(OBJ_CACHE_NONE){} +- std::string cache_file_name; +- void encode(bufferlist &bl){} +- void decode(bufferlist::iterator &it){} +- }; +- +- float m_watermark; +- uint64_t m_entry_count; +- +- std::unordered_map<std::string, Entry*> m_cache_map; +- RWLock m_cache_map_lock; +- +- std::deque<Entry*> m_free_list; +- Mutex m_free_list_lock; +- +- LRU m_promoted_lru; // include promoted, using status. +- +-}; +- +-} // namespace cache +-} // namespace rbd +-#endif +diff --git a/src/tools/rbd_cache/main.cc b/src/tools/rbd_cache/main.cc +deleted file mode 100644 +index d604760..0000000 +--- a/src/tools/rbd_cache/main.cc ++++ /dev/null +@@ -1,85 +0,0 @@ +-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +-// vim: ts=8 sw=2 smarttab +- +-#include "common/ceph_argparse.h" +-#include "common/config.h" +-#include "common/debug.h" +-#include "common/errno.h" +-#include "global/global_init.h" +-#include "global/signal_handler.h" +-#include "CacheController.h" +- +-#include <vector> +- +-rbd::cache::CacheController *cachectl = nullptr; +- +-void usage() { +- std::cout << "usage: cache controller [options...]" << std::endl; +- std::cout << "options:\n"; +- std::cout << " -m monaddress[:port] connect to specified monitor\n"; +- std::cout << " --keyring=<path> path to keyring for local cluster\n"; +- std::cout << " --log-file=<logfile> file to log debug output\n"; +- std::cout << " --debug-rbd-cachecontroller=<log-level>/<memory-level> set rbd-mirror debug level\n"; +- generic_server_usage(); +-} +- +-static void handle_signal(int signum) +-{ +- if (cachectl) +- cachectl->handle_signal(signum); +-} +- +-int main(int argc, const char **argv) +-{ +- std::vector<const char*> args; +- env_to_vec(args); +- argv_to_vec(argc, argv, args); +- +- auto cct = global_init(nullptr, args, CEPH_ENTITY_TYPE_CLIENT, +- CODE_ENVIRONMENT_DAEMON, +- CINIT_FLAG_UNPRIVILEGED_DAEMON_DEFAULTS); +- +- for (auto i = args.begin(); i != args.end(); ++i) { +- if (ceph_argparse_flag(args, i, "-h", "--help", (char*)NULL)) { +- usage(); +- return EXIT_SUCCESS; +- } +- } +- +- if (g_conf()->daemonize) { +- global_init_daemonize(g_ceph_context); +- } +- g_ceph_context->enable_perf_counter(); +- +- common_init_finish(g_ceph_context); +- +- init_async_signal_handler(); +- register_async_signal_handler(SIGHUP, sighup_handler); +- register_async_signal_handler_oneshot(SIGINT, handle_signal); +- register_async_signal_handler_oneshot(SIGTERM, handle_signal); +- +- std::vector<const char*> cmd_args; +- argv_to_vec(argc, argv, cmd_args); +- +- // disable unnecessary librbd cache +- g_ceph_context->_conf.set_val_or_die("rbd_cache", "false"); +- +- cachectl = new rbd::cache::CacheController(g_ceph_context, cmd_args); +- int r = cachectl->init(); +- if (r < 0) { +- std::cerr << "failed to initialize: " << cpp_strerror(r) << std::endl; +- goto cleanup; +- } +- +- cachectl->run(); +- +- cleanup: +- unregister_async_signal_handler(SIGHUP, sighup_handler); +- unregister_async_signal_handler(SIGINT, handle_signal); +- unregister_async_signal_handler(SIGTERM, handle_signal); +- shutdown_async_signal_handler(); +- +- delete cachectl; +- +- return r < 0 ? EXIT_SUCCESS : EXIT_FAILURE; +-} +-- +2.7.4 + diff --git a/src/ceph/ceph.rc b/src/ceph/ceph.rc index 6c5f95e..9484318 100644 --- a/src/ceph/ceph.rc +++ b/src/ceph/ceph.rc @@ -10,4 +10,15 @@ OPTION="" # "0001-crypto-add-openssl-support-for-RGW-encryption.patch" \ # "0001-add-QAT-support.patch" \ # ) -SOURCES=() +SOURCES=( + "0001-librbd-shared-persistent-read-only-rbd-cache.patch" \ + "0002-librbd-cleanup-rbd-shared-RO-cache.patch" \ + "0003-librbd-fix-bufferlist-point.patch" \ + "0004-librbd-fix-lookup-object-return.patch" \ + "0005-librbd-fix-conf-get_val.patch" \ + "0006-librbd-LRU-policy-based-eviction.patch" \ + "0007-librbd-cleanup-policy-based-promotion-eviction.patch" \ + "0008-librbd-implement-async-cache-lookup-and-read.patch" \ + "0009-librbd-clean-up-on-rbd-shared-cache.patch" \ + "0010-librbd-new-namespace-ceph-immutable-obj-cache.patch" \ +) |