diff options
Diffstat (limited to 'src/ceph/0010-librbd-new-namespace-ceph-immutable-obj-cache.patch')
-rw-r--r-- | src/ceph/0010-librbd-new-namespace-ceph-immutable-obj-cache.patch | 3510 |
1 files changed, 3510 insertions, 0 deletions
diff --git a/src/ceph/0010-librbd-new-namespace-ceph-immutable-obj-cache.patch b/src/ceph/0010-librbd-new-namespace-ceph-immutable-obj-cache.patch new file mode 100644 index 0000000..46040e6 --- /dev/null +++ b/src/ceph/0010-librbd-new-namespace-ceph-immutable-obj-cache.patch @@ -0,0 +1,3510 @@ +From 9a097084d06e7186206b43d9c81d1f648791d7a4 Mon Sep 17 00:00:00 2001 +From: Yuan Zhou <yuan.zhou@intel.com> +Date: Fri, 7 Sep 2018 08:29:51 +0800 +Subject: [PATCH 10/10] librbd: new namespace ceph immutable obj cache + +clean up class/func names to use the new namespace + +Signed-off-by: Yuan Zhou <yuan.zhou@intel.com> +--- + src/common/options.cc | 2 +- + src/common/subsys.h | 3 +- + src/librbd/CMakeLists.txt | 3 +- + .../SharedPersistentObjectCacherObjectDispatch.cc | 175 ---------------- + .../SharedPersistentObjectCacherObjectDispatch.h | 133 ------------ + src/librbd/cache/SharedReadOnlyObjectDispatch.cc | 170 +++++++++++++++ + src/librbd/cache/SharedReadOnlyObjectDispatch.h | 126 ++++++++++++ + src/librbd/image/OpenRequest.cc | 4 +- + src/tools/CMakeLists.txt | 2 +- + .../ceph_immutable_object_cache/CMakeLists.txt | 11 + + .../ceph_immutable_object_cache/CacheClient.cc | 205 ++++++++++++++++++ + .../ceph_immutable_object_cache/CacheClient.h | 53 +++++ + .../ceph_immutable_object_cache/CacheController.cc | 117 +++++++++++ + .../ceph_immutable_object_cache/CacheController.h | 53 +++++ + .../ceph_immutable_object_cache/CacheServer.cc | 99 +++++++++ + .../ceph_immutable_object_cache/CacheServer.h | 54 +++++ + .../ceph_immutable_object_cache/CacheSession.cc | 115 +++++++++++ + .../ceph_immutable_object_cache/CacheSession.h | 58 ++++++ + .../ObjectCacheStore.cc | 172 ++++++++++++++++ + .../ceph_immutable_object_cache/ObjectCacheStore.h | 70 +++++++ + src/tools/ceph_immutable_object_cache/Policy.hpp | 33 +++ + .../ceph_immutable_object_cache/SimplePolicy.hpp | 163 +++++++++++++++ + .../ceph_immutable_object_cache/SocketCommon.h | 54 +++++ + src/tools/ceph_immutable_object_cache/main.cc | 85 ++++++++ + src/tools/rbd_cache/CMakeLists.txt | 9 - + src/tools/rbd_cache/CacheController.cc | 116 ----------- + src/tools/rbd_cache/CacheController.h | 54 ----- + src/tools/rbd_cache/CacheControllerSocket.hpp | 228 -------------------- + .../rbd_cache/CacheControllerSocketClient.hpp | 229 --------------------- + src/tools/rbd_cache/CacheControllerSocketCommon.h | 62 ------ + src/tools/rbd_cache/ObjectCacheStore.cc | 172 ---------------- + src/tools/rbd_cache/ObjectCacheStore.h | 70 ------- + src/tools/rbd_cache/Policy.hpp | 30 --- + src/tools/rbd_cache/SimplePolicy.hpp | 160 -------------- + src/tools/rbd_cache/main.cc | 85 -------- + 35 files changed, 1646 insertions(+), 1529 deletions(-) + delete mode 100644 src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc + delete mode 100644 src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.h + create mode 100644 src/librbd/cache/SharedReadOnlyObjectDispatch.cc + create mode 100644 src/librbd/cache/SharedReadOnlyObjectDispatch.h + create mode 100644 src/tools/ceph_immutable_object_cache/CMakeLists.txt + create mode 100644 src/tools/ceph_immutable_object_cache/CacheClient.cc + create mode 100644 src/tools/ceph_immutable_object_cache/CacheClient.h + create mode 100644 src/tools/ceph_immutable_object_cache/CacheController.cc + create mode 100644 src/tools/ceph_immutable_object_cache/CacheController.h + create mode 100644 src/tools/ceph_immutable_object_cache/CacheServer.cc + create mode 100644 src/tools/ceph_immutable_object_cache/CacheServer.h + create mode 100644 src/tools/ceph_immutable_object_cache/CacheSession.cc + create mode 100644 src/tools/ceph_immutable_object_cache/CacheSession.h + create mode 100644 src/tools/ceph_immutable_object_cache/ObjectCacheStore.cc + create mode 100644 src/tools/ceph_immutable_object_cache/ObjectCacheStore.h + create mode 100644 src/tools/ceph_immutable_object_cache/Policy.hpp + create mode 100644 src/tools/ceph_immutable_object_cache/SimplePolicy.hpp + create mode 100644 src/tools/ceph_immutable_object_cache/SocketCommon.h + create mode 100644 src/tools/ceph_immutable_object_cache/main.cc + delete mode 100644 src/tools/rbd_cache/CMakeLists.txt + delete mode 100644 src/tools/rbd_cache/CacheController.cc + delete mode 100644 src/tools/rbd_cache/CacheController.h + delete mode 100644 src/tools/rbd_cache/CacheControllerSocket.hpp + delete mode 100644 src/tools/rbd_cache/CacheControllerSocketClient.hpp + delete mode 100644 src/tools/rbd_cache/CacheControllerSocketCommon.h + delete mode 100644 src/tools/rbd_cache/ObjectCacheStore.cc + delete mode 100644 src/tools/rbd_cache/ObjectCacheStore.h + delete mode 100644 src/tools/rbd_cache/Policy.hpp + delete mode 100644 src/tools/rbd_cache/SimplePolicy.hpp + delete mode 100644 src/tools/rbd_cache/main.cc + +diff --git a/src/common/options.cc b/src/common/options.cc +index 3172744..bf00aab 100644 +--- a/src/common/options.cc ++++ b/src/common/options.cc +@@ -6358,7 +6358,7 @@ static std::vector<Option> get_rbd_options() { + .set_description("time in seconds for detecting a hung thread"), + + Option("rbd_shared_cache_enabled", Option::TYPE_BOOL, Option::LEVEL_ADVANCED) +- .set_default(true) ++ .set_default(false) + .set_description("whether to enable shared ssd caching"), + + Option("rbd_shared_cache_path", Option::TYPE_STR, Option::LEVEL_ADVANCED) +diff --git a/src/common/subsys.h b/src/common/subsys.h +index bdd2d0e..5b532c1 100644 +--- a/src/common/subsys.h ++++ b/src/common/subsys.h +@@ -36,9 +36,10 @@ SUBSYS(objecter, 0, 1) + SUBSYS(rados, 0, 5) + SUBSYS(rbd, 0, 5) + SUBSYS(rbd_mirror, 0, 5) +-SUBSYS(rbd_replay, 0, 5) + SUBSYS(journaler, 0, 5) + SUBSYS(objectcacher, 0, 5) ++SUBSYS(immutable_obj_cache, 0, 5) ++SUBSYS(rbd_replay, 0, 5) + SUBSYS(client, 0, 5) + SUBSYS(osd, 1, 5) + SUBSYS(optracker, 0, 5) +diff --git a/src/librbd/CMakeLists.txt b/src/librbd/CMakeLists.txt +index 540ee78..c9bfb6f 100644 +--- a/src/librbd/CMakeLists.txt ++++ b/src/librbd/CMakeLists.txt +@@ -32,7 +32,7 @@ set(librbd_internal_srcs + api/Snapshot.cc + cache/ImageWriteback.cc + cache/ObjectCacherObjectDispatch.cc +- cache/SharedPersistentObjectCacherObjectDispatch.cc ++ cache/SharedReadOnlyObjectDispatch.cc + cache/SharedPersistentObjectCacher.cc + cache/SharedPersistentObjectCacherFile.cc + deep_copy/ImageCopyRequest.cc +@@ -125,6 +125,7 @@ set(librbd_internal_srcs + trash/MoveRequest.cc + watcher/Notifier.cc + watcher/RewatchRequest.cc ++ ${CMAKE_SOURCE_DIR}/src/tools/ceph_immutable_object_cache/CacheClient.cc + ${CMAKE_SOURCE_DIR}/src/common/ContextCompletion.cc) + + add_library(rbd_api STATIC librbd.cc) +diff --git a/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc b/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc +deleted file mode 100644 +index 7cbc019..0000000 +--- a/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc ++++ /dev/null +@@ -1,175 +0,0 @@ +-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +-// vim: ts=8 sw=2 smarttab +- +-#include "librbd/cache/SharedPersistentObjectCacherObjectDispatch.h" +-#include "common/WorkQueue.h" +-#include "librbd/ImageCtx.h" +-#include "librbd/Journal.h" +-#include "librbd/Utils.h" +-#include "librbd/LibrbdWriteback.h" +-#include "librbd/io/ObjectDispatchSpec.h" +-#include "librbd/io/ObjectDispatcher.h" +-#include "librbd/io/Utils.h" +-#include "osd/osd_types.h" +-#include "osdc/WritebackHandler.h" +-#include <vector> +- +-#define dout_subsys ceph_subsys_rbd +-#undef dout_prefix +-#define dout_prefix *_dout << "librbd::cache::SharedPersistentObjectCacherObjectDispatch: " \ +- << this << " " << __func__ << ": " +- +-namespace librbd { +-namespace cache { +- +-template <typename I> +-SharedPersistentObjectCacherObjectDispatch<I>::SharedPersistentObjectCacherObjectDispatch( +- I* image_ctx) : m_image_ctx(image_ctx) { +-} +- +-template <typename I> +-SharedPersistentObjectCacherObjectDispatch<I>::~SharedPersistentObjectCacherObjectDispatch() { +- delete m_object_store; +- delete m_cache_client; +-} +- +-// TODO if connect fails, init will return error to high layer. +-template <typename I> +-void SharedPersistentObjectCacherObjectDispatch<I>::init() { +- auto cct = m_image_ctx->cct; +- ldout(cct, 5) << dendl; +- +- if (m_image_ctx->parent != nullptr) { +- //TODO(): should we cover multi-leveled clone? +- ldout(cct, 5) << "child image: skipping SRO cache client" << dendl; +- return; +- } +- +- ldout(cct, 5) << "parent image: setup SRO cache client = " << dendl; +- +- std::string controller_path = ((CephContext*)cct)->_conf.get_val<std::string>("rbd_shared_cache_sock"); +- m_cache_client = new rbd::cache::CacheClient(controller_path.c_str(), +- ([&](std::string s){client_handle_request(s);}), m_image_ctx->cct); +- +- int ret = m_cache_client->connect(); +- if (ret < 0) { +- ldout(cct, 5) << "SRO cache client fail to connect with local controller: " +- << "please start rbd-cache daemon" +- << dendl; +- } else { +- ldout(cct, 5) << "SRO cache client to register volume on rbd-cache daemon: " +- << "name = " << m_image_ctx->id +- << dendl; +- +- ret = m_cache_client->register_volume(m_image_ctx->data_ctx.get_pool_name(), +- m_image_ctx->id, m_image_ctx->size); +- +- if (ret >= 0) { +- // add ourself to the IO object dispatcher chain +- m_image_ctx->io_object_dispatcher->register_object_dispatch(this); +- } +- } +-} +- +-template <typename I> +-bool SharedPersistentObjectCacherObjectDispatch<I>::read( +- const std::string &oid, uint64_t object_no, uint64_t object_off, +- uint64_t object_len, librados::snap_t snap_id, int op_flags, +- const ZTracer::Trace &parent_trace, ceph::bufferlist* read_data, +- io::ExtentMap* extent_map, int* object_dispatch_flags, +- io::DispatchResult* dispatch_result, Context** on_finish, +- Context* on_dispatched) { +- +- // IO chained in reverse order +- +- // Now, policy is : when session have any error, later all read will dispatched to rados layer. +- if(!m_cache_client->is_session_work()) { +- *dispatch_result = io::DISPATCH_RESULT_CONTINUE; +- on_dispatched->complete(0); +- return true; +- // TODO : domain socket have error, all read operation will dispatched to rados layer. +- } +- +- auto cct = m_image_ctx->cct; +- ldout(cct, 20) << "object_no=" << object_no << " " << object_off << "~" +- << object_len << dendl; +- +- +- on_dispatched = util::create_async_context_callback(*m_image_ctx, +- on_dispatched); +- auto ctx = new FunctionContext([this, oid, object_off, object_len, read_data, dispatch_result, on_dispatched](bool cache) { +- handle_read_cache(cache, oid, object_off, object_len, read_data, dispatch_result, on_dispatched); +- }); +- +- if (m_cache_client && m_cache_client->is_session_work() && m_object_store) { +- m_cache_client->lookup_object(m_image_ctx->data_ctx.get_pool_name(), +- m_image_ctx->id, oid, ctx); +- } +- return true; +-} +- +-template <typename I> +-int SharedPersistentObjectCacherObjectDispatch<I>::handle_read_cache( +- bool cache, +- const std::string &oid, uint64_t object_off, uint64_t object_len, +- ceph::bufferlist* read_data, io::DispatchResult* dispatch_result, +- Context* on_dispatched) { +- // IO chained in reverse order +- auto cct = m_image_ctx->cct; +- ldout(cct, 20) << dendl; +- +- // try to read from parent image +- if (cache) { +- int r = m_object_store->read_object(oid, read_data, object_off, object_len, on_dispatched); +- //int r = object_len; +- if (r != 0) { +- *dispatch_result = io::DISPATCH_RESULT_COMPLETE; +- //TODO(): complete in syncfile +- on_dispatched->complete(r); +- ldout(cct, 20) << "AAAAcomplete=" << *dispatch_result <<dendl; +- return true; +- } +- } else { +- *dispatch_result = io::DISPATCH_RESULT_CONTINUE; +- on_dispatched->complete(0); +- ldout(cct, 20) << "BBB no cache" << *dispatch_result <<dendl; +- return false; +- } +-} +-template <typename I> +-void SharedPersistentObjectCacherObjectDispatch<I>::client_handle_request(std::string msg) { +- auto cct = m_image_ctx->cct; +- ldout(cct, 20) << dendl; +- +- rbd::cache::rbdsc_req_type_t *io_ctx = (rbd::cache::rbdsc_req_type_t*)(msg.c_str()); +- +- switch (io_ctx->type) { +- case rbd::cache::RBDSC_REGISTER_REPLY: { +- // open cache handler for volume +- ldout(cct, 20) << "SRO cache client open cache handler" << dendl; +- m_object_store = new SharedPersistentObjectCacher<I>(m_image_ctx, m_image_ctx->shared_cache_path); +- +- break; +- } +- case rbd::cache::RBDSC_READ_REPLY: { +- ldout(cct, 20) << "SRO cache client start to read cache" << dendl; +- //TODO(): should call read here +- +- break; +- } +- case rbd::cache::RBDSC_READ_RADOS: { +- ldout(cct, 20) << "SRO cache client start to read rados" << dendl; +- //TODO(): should call read here +- +- break; +- } +- default: ldout(cct, 20) << "nothing" << io_ctx->type <<dendl; +- break; +- +- } +-} +- +-} // namespace cache +-} // namespace librbd +- +-template class librbd::cache::SharedPersistentObjectCacherObjectDispatch<librbd::ImageCtx>; +diff --git a/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.h b/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.h +deleted file mode 100644 +index 5685244..0000000 +--- a/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.h ++++ /dev/null +@@ -1,133 +0,0 @@ +-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +-// vim: ts=8 sw=2 smarttab +- +-#ifndef CEPH_LIBRBD_CACHE_SHARED_PERSISTENT_OBJECT_CACHER_OBJECT_DISPATCH_H +-#define CEPH_LIBRBD_CACHE_SHARED_PERSISTENT_OBJECT_CACHER_OBJECT_DISPATCH_H +- +-#include "librbd/io/ObjectDispatchInterface.h" +-#include "common/Mutex.h" +-#include "osdc/ObjectCacher.h" +-#include "tools/rbd_cache/CacheControllerSocketClient.hpp" +-#include "SharedPersistentObjectCacher.h" +- +-struct WritebackHandler; +- +-namespace librbd { +- +-class ImageCtx; +- +-namespace cache { +- +-/** +- * Facade around the OSDC object cacher to make it align with +- * the object dispatcher interface +- */ +-template <typename ImageCtxT = ImageCtx> +-class SharedPersistentObjectCacherObjectDispatch : public io::ObjectDispatchInterface { +-public: +- static SharedPersistentObjectCacherObjectDispatch* create(ImageCtxT* image_ctx) { +- return new SharedPersistentObjectCacherObjectDispatch(image_ctx); +- } +- +- SharedPersistentObjectCacherObjectDispatch(ImageCtxT* image_ctx); +- ~SharedPersistentObjectCacherObjectDispatch() override; +- +- io::ObjectDispatchLayer get_object_dispatch_layer() const override { +- return io::OBJECT_DISPATCH_LAYER_SHARED_PERSISTENT_CACHE; +- } +- +- void init(); +- void shut_down(Context* on_finish) { +- m_image_ctx->op_work_queue->queue(on_finish, 0); +- } +- +- bool read( +- const std::string &oid, uint64_t object_no, uint64_t object_off, +- uint64_t object_len, librados::snap_t snap_id, int op_flags, +- const ZTracer::Trace &parent_trace, ceph::bufferlist* read_data, +- io::ExtentMap* extent_map, int* object_dispatch_flags, +- io::DispatchResult* dispatch_result, Context** on_finish, +- Context* on_dispatched) override; +- +- bool discard( +- const std::string &oid, uint64_t object_no, uint64_t object_off, +- uint64_t object_len, const ::SnapContext &snapc, int discard_flags, +- const ZTracer::Trace &parent_trace, int* object_dispatch_flags, +- uint64_t* journal_tid, io::DispatchResult* dispatch_result, +- Context** on_finish, Context* on_dispatched) { +- return false; +- } +- +- bool write( +- const std::string &oid, uint64_t object_no, uint64_t object_off, +- ceph::bufferlist&& data, const ::SnapContext &snapc, int op_flags, +- const ZTracer::Trace &parent_trace, int* object_dispatch_flags, +- uint64_t* journal_tid, io::DispatchResult* dispatch_result, +- Context** on_finish, Context* on_dispatched) { +- return false; +- } +- +- bool write_same( +- const std::string &oid, uint64_t object_no, uint64_t object_off, +- uint64_t object_len, io::Extents&& buffer_extents, +- ceph::bufferlist&& data, const ::SnapContext &snapc, int op_flags, +- const ZTracer::Trace &parent_trace, int* object_dispatch_flags, +- uint64_t* journal_tid, io::DispatchResult* dispatch_result, +- Context** on_finish, Context* on_dispatched) { +- return false; +- } +- +- bool compare_and_write( +- const std::string &oid, uint64_t object_no, uint64_t object_off, +- ceph::bufferlist&& cmp_data, ceph::bufferlist&& write_data, +- const ::SnapContext &snapc, int op_flags, +- const ZTracer::Trace &parent_trace, uint64_t* mismatch_offset, +- int* object_dispatch_flags, uint64_t* journal_tid, +- io::DispatchResult* dispatch_result, Context** on_finish, +- Context* on_dispatched) { +- return false; +- } +- +- bool flush( +- io::FlushSource flush_source, const ZTracer::Trace &parent_trace, +- io::DispatchResult* dispatch_result, Context** on_finish, +- Context* on_dispatched) { +- return false; +- } +- +- bool invalidate_cache(Context* on_finish) { +- return false; +- } +- +- bool reset_existence_cache(Context* on_finish) { +- return false; +- } +- +- void extent_overwritten( +- uint64_t object_no, uint64_t object_off, uint64_t object_len, +- uint64_t journal_tid, uint64_t new_journal_tid) { +- } +- +- SharedPersistentObjectCacher<ImageCtxT> *m_object_store = nullptr; +- +-private: +- +- int handle_read_cache( +- bool cache, +- const std::string &oid, uint64_t object_off, +- uint64_t object_len, ceph::bufferlist* read_data, +- io::DispatchResult* dispatch_result, +- Context* on_dispatched); +- void client_handle_request(std::string msg); +- +- ImageCtxT* m_image_ctx; +- +- rbd::cache::CacheClient *m_cache_client = nullptr; +-}; +- +-} // namespace cache +-} // namespace librbd +- +-extern template class librbd::cache::SharedPersistentObjectCacherObjectDispatch<librbd::ImageCtx>; +- +-#endif // CEPH_LIBRBD_CACHE_OBJECT_CACHER_OBJECT_DISPATCH_H +diff --git a/src/librbd/cache/SharedReadOnlyObjectDispatch.cc b/src/librbd/cache/SharedReadOnlyObjectDispatch.cc +new file mode 100644 +index 0000000..23c7dbe +--- /dev/null ++++ b/src/librbd/cache/SharedReadOnlyObjectDispatch.cc +@@ -0,0 +1,170 @@ ++// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- ++// vim: ts=8 sw=2 smarttab ++ ++#include "common/WorkQueue.h" ++#include "librbd/ImageCtx.h" ++#include "librbd/Journal.h" ++#include "librbd/Utils.h" ++#include "librbd/LibrbdWriteback.h" ++#include "librbd/io/ObjectDispatchSpec.h" ++#include "librbd/io/ObjectDispatcher.h" ++#include "librbd/io/Utils.h" ++#include "librbd/cache/SharedReadOnlyObjectDispatch.h" ++#include "osd/osd_types.h" ++#include "osdc/WritebackHandler.h" ++ ++#include <vector> ++ ++#define dout_subsys ceph_subsys_rbd ++#undef dout_prefix ++#define dout_prefix *_dout << "librbd::cache::SharedReadOnlyObjectDispatch: " \ ++ << this << " " << __func__ << ": " ++ ++namespace librbd { ++namespace cache { ++ ++template <typename I> ++SharedReadOnlyObjectDispatch<I>::SharedReadOnlyObjectDispatch( ++ I* image_ctx) : m_image_ctx(image_ctx) { ++} ++ ++template <typename I> ++SharedReadOnlyObjectDispatch<I>::~SharedReadOnlyObjectDispatch() { ++ delete m_object_store; ++ delete m_cache_client; ++} ++ ++// TODO if connect fails, init will return error to high layer. ++template <typename I> ++void SharedReadOnlyObjectDispatch<I>::init() { ++ auto cct = m_image_ctx->cct; ++ ldout(cct, 5) << dendl; ++ ++ if (m_image_ctx->parent != nullptr) { ++ //TODO(): should we cover multi-leveled clone? ++ ldout(cct, 5) << "child image: skipping SRO cache client" << dendl; ++ return; ++ } ++ ++ ldout(cct, 5) << "parent image: setup SRO cache client = " << dendl; ++ ++ std::string controller_path = ((CephContext*)cct)->_conf.get_val<std::string>("rbd_shared_cache_sock"); ++ m_cache_client = new ceph::immutable_obj_cache::CacheClient(controller_path.c_str(), ++ ([&](std::string s){client_handle_request(s);}), m_image_ctx->cct); ++ ++ int ret = m_cache_client->connect(); ++ if (ret < 0) { ++ ldout(cct, 5) << "SRO cache client fail to connect with local controller: " ++ << "please start rbd-cache daemon" ++ << dendl; ++ } else { ++ ldout(cct, 5) << "SRO cache client to register volume on rbd-cache daemon: " ++ << "name = " << m_image_ctx->id ++ << dendl; ++ ++ ret = m_cache_client->register_volume(m_image_ctx->data_ctx.get_pool_name(), ++ m_image_ctx->id, m_image_ctx->size); ++ ++ if (ret >= 0) { ++ // add ourself to the IO object dispatcher chain ++ m_image_ctx->io_object_dispatcher->register_object_dispatch(this); ++ } ++ } ++} ++ ++template <typename I> ++bool SharedReadOnlyObjectDispatch<I>::read( ++ const std::string &oid, uint64_t object_no, uint64_t object_off, ++ uint64_t object_len, librados::snap_t snap_id, int op_flags, ++ const ZTracer::Trace &parent_trace, ceph::bufferlist* read_data, ++ io::ExtentMap* extent_map, int* object_dispatch_flags, ++ io::DispatchResult* dispatch_result, Context** on_finish, ++ Context* on_dispatched) { ++ auto cct = m_image_ctx->cct; ++ ldout(cct, 20) << "object_no=" << object_no << " " << object_off << "~" ++ << object_len << dendl; ++ ++ // if any session fails, later reads will go to rados ++ if(!m_cache_client->is_session_work()) { ++ *dispatch_result = io::DISPATCH_RESULT_CONTINUE; ++ on_dispatched->complete(0); ++ return true; ++ // TODO(): fix domain socket error ++ } ++ ++ auto ctx = new FunctionContext([this, oid, object_off, object_len, ++ read_data, dispatch_result, on_dispatched](bool cache) { ++ handle_read_cache(cache, oid, object_off, object_len, ++ read_data, dispatch_result, on_dispatched); ++ }); ++ ++ if (m_cache_client && m_cache_client->is_session_work() && m_object_store) { ++ m_cache_client->lookup_object(m_image_ctx->data_ctx.get_pool_name(), ++ m_image_ctx->id, oid, ctx); ++ } ++ return true; ++} ++ ++template <typename I> ++int SharedReadOnlyObjectDispatch<I>::handle_read_cache( ++ bool cache, const std::string &oid, uint64_t object_off, ++ uint64_t object_len, ceph::bufferlist* read_data, ++ io::DispatchResult* dispatch_result, Context* on_dispatched) { ++ auto cct = m_image_ctx->cct; ++ ldout(cct, 20) << dendl; ++ ++ // try to read from parent image ++ if (cache) { ++ int r = m_object_store->read_object(oid, read_data, object_off, object_len, on_dispatched); ++ //int r = object_len; ++ if (r != 0) { ++ *dispatch_result = io::DISPATCH_RESULT_COMPLETE; ++ //TODO(): complete in syncfile ++ on_dispatched->complete(r); ++ ldout(cct, 20) << "read cache: " << *dispatch_result <<dendl; ++ return true; ++ } ++ } else { ++ *dispatch_result = io::DISPATCH_RESULT_CONTINUE; ++ on_dispatched->complete(0); ++ ldout(cct, 20) << "read rados: " << *dispatch_result <<dendl; ++ return false; ++ } ++} ++template <typename I> ++void SharedReadOnlyObjectDispatch<I>::client_handle_request(std::string msg) { ++ auto cct = m_image_ctx->cct; ++ ldout(cct, 20) << dendl; ++ ++ ceph::immutable_obj_cache::rbdsc_req_type_t *io_ctx = (ceph::immutable_obj_cache::rbdsc_req_type_t*)(msg.c_str()); ++ ++ switch (io_ctx->type) { ++ case ceph::immutable_obj_cache::RBDSC_REGISTER_REPLY: { ++ // open cache handler for volume ++ ldout(cct, 20) << "SRO cache client open cache handler" << dendl; ++ m_object_store = new SharedPersistentObjectCacher<I>(m_image_ctx, m_image_ctx->shared_cache_path); ++ ++ break; ++ } ++ case ceph::immutable_obj_cache::RBDSC_READ_REPLY: { ++ ldout(cct, 20) << "SRO cache client start to read cache" << dendl; ++ //TODO(): should call read here ++ ++ break; ++ } ++ case ceph::immutable_obj_cache::RBDSC_READ_RADOS: { ++ ldout(cct, 20) << "SRO cache client start to read rados" << dendl; ++ //TODO(): should call read here ++ ++ break; ++ } ++ default: ldout(cct, 20) << "nothing" << io_ctx->type <<dendl; ++ break; ++ ++ } ++} ++ ++} // namespace cache ++} // namespace librbd ++ ++template class librbd::cache::SharedReadOnlyObjectDispatch<librbd::ImageCtx>; +diff --git a/src/librbd/cache/SharedReadOnlyObjectDispatch.h b/src/librbd/cache/SharedReadOnlyObjectDispatch.h +new file mode 100644 +index 0000000..9b56da9 +--- /dev/null ++++ b/src/librbd/cache/SharedReadOnlyObjectDispatch.h +@@ -0,0 +1,126 @@ ++// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- ++// vim: ts=8 sw=2 smarttab ++ ++#ifndef CEPH_LIBRBD_CACHE_SHARED_PERSISTENT_OBJECT_CACHER_OBJECT_DISPATCH_H ++#define CEPH_LIBRBD_CACHE_SHARED_PERSISTENT_OBJECT_CACHER_OBJECT_DISPATCH_H ++ ++#include "common/Mutex.h" ++#include "SharedPersistentObjectCacher.h" ++#include "librbd/io/ObjectDispatchInterface.h" ++#include "tools/ceph_immutable_object_cache/CacheClient.h" ++ ++ ++namespace librbd { ++ ++class ImageCtx; ++ ++namespace cache { ++ ++template <typename ImageCtxT = ImageCtx> ++class SharedReadOnlyObjectDispatch : public io::ObjectDispatchInterface { ++public: ++ static SharedReadOnlyObjectDispatch* create(ImageCtxT* image_ctx) { ++ return new SharedReadOnlyObjectDispatch(image_ctx); ++ } ++ ++ SharedReadOnlyObjectDispatch(ImageCtxT* image_ctx); ++ ~SharedReadOnlyObjectDispatch() override; ++ ++ io::ObjectDispatchLayer get_object_dispatch_layer() const override { ++ return io::OBJECT_DISPATCH_LAYER_SHARED_PERSISTENT_CACHE; ++ } ++ ++ void init(); ++ void shut_down(Context* on_finish) { ++ m_image_ctx->op_work_queue->queue(on_finish, 0); ++ } ++ ++ bool read( ++ const std::string &oid, uint64_t object_no, uint64_t object_off, ++ uint64_t object_len, librados::snap_t snap_id, int op_flags, ++ const ZTracer::Trace &parent_trace, ceph::bufferlist* read_data, ++ io::ExtentMap* extent_map, int* object_dispatch_flags, ++ io::DispatchResult* dispatch_result, Context** on_finish, ++ Context* on_dispatched) override; ++ ++ bool discard( ++ const std::string &oid, uint64_t object_no, uint64_t object_off, ++ uint64_t object_len, const ::SnapContext &snapc, int discard_flags, ++ const ZTracer::Trace &parent_trace, int* object_dispatch_flags, ++ uint64_t* journal_tid, io::DispatchResult* dispatch_result, ++ Context** on_finish, Context* on_dispatched) { ++ return false; ++ } ++ ++ bool write( ++ const std::string &oid, uint64_t object_no, uint64_t object_off, ++ ceph::bufferlist&& data, const ::SnapContext &snapc, int op_flags, ++ const ZTracer::Trace &parent_trace, int* object_dispatch_flags, ++ uint64_t* journal_tid, io::DispatchResult* dispatch_result, ++ Context** on_finish, Context* on_dispatched) { ++ return false; ++ } ++ ++ bool write_same( ++ const std::string &oid, uint64_t object_no, uint64_t object_off, ++ uint64_t object_len, io::Extents&& buffer_extents, ++ ceph::bufferlist&& data, const ::SnapContext &snapc, int op_flags, ++ const ZTracer::Trace &parent_trace, int* object_dispatch_flags, ++ uint64_t* journal_tid, io::DispatchResult* dispatch_result, ++ Context** on_finish, Context* on_dispatched) { ++ return false; ++ } ++ ++ bool compare_and_write( ++ const std::string &oid, uint64_t object_no, uint64_t object_off, ++ ceph::bufferlist&& cmp_data, ceph::bufferlist&& write_data, ++ const ::SnapContext &snapc, int op_flags, ++ const ZTracer::Trace &parent_trace, uint64_t* mismatch_offset, ++ int* object_dispatch_flags, uint64_t* journal_tid, ++ io::DispatchResult* dispatch_result, Context** on_finish, ++ Context* on_dispatched) { ++ return false; ++ } ++ ++ bool flush( ++ io::FlushSource flush_source, const ZTracer::Trace &parent_trace, ++ io::DispatchResult* dispatch_result, Context** on_finish, ++ Context* on_dispatched) { ++ return false; ++ } ++ ++ bool invalidate_cache(Context* on_finish) { ++ return false; ++ } ++ ++ bool reset_existence_cache(Context* on_finish) { ++ return false; ++ } ++ ++ void extent_overwritten( ++ uint64_t object_no, uint64_t object_off, uint64_t object_len, ++ uint64_t journal_tid, uint64_t new_journal_tid) { ++ } ++ ++private: ++ ++ int handle_read_cache( ++ bool cache, ++ const std::string &oid, uint64_t object_off, ++ uint64_t object_len, ceph::bufferlist* read_data, ++ io::DispatchResult* dispatch_result, ++ Context* on_dispatched); ++ void client_handle_request(std::string msg); ++ ++ ImageCtxT* m_image_ctx; ++ ++ ceph::immutable_obj_cache::CacheClient *m_cache_client = nullptr; ++ SharedPersistentObjectCacher<ImageCtxT> *m_object_store = nullptr; ++}; ++ ++} // namespace cache ++} // namespace librbd ++ ++extern template class librbd::cache::SharedReadOnlyObjectDispatch<librbd::ImageCtx>; ++ ++#endif // CEPH_LIBRBD_CACHE_OBJECT_CACHER_OBJECT_DISPATCH_H +diff --git a/src/librbd/image/OpenRequest.cc b/src/librbd/image/OpenRequest.cc +index 30a7b66..57ce92f 100644 +--- a/src/librbd/image/OpenRequest.cc ++++ b/src/librbd/image/OpenRequest.cc +@@ -8,7 +8,7 @@ + #include "librbd/ImageCtx.h" + #include "librbd/Utils.h" + #include "librbd/cache/ObjectCacherObjectDispatch.h" +-#include "librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc" ++#include "librbd/cache/SharedReadOnlyObjectDispatch.cc" + #include "librbd/image/CloseRequest.h" + #include "librbd/image/RefreshRequest.h" + #include "librbd/image/SetSnapRequest.h" +@@ -457,7 +457,7 @@ Context *OpenRequest<I>::send_init_cache(int *result) { + // enable Shared Read-only cache for parent image + if (m_image_ctx->child != nullptr && m_image_ctx->shared_cache_enabled ) { + ldout(cct, 10) << this << " " << "setting up parent cache"<< dendl; +- auto sro_cache = cache::SharedPersistentObjectCacherObjectDispatch<I>::create(m_image_ctx); ++ auto sro_cache = cache::SharedReadOnlyObjectDispatch<I>::create(m_image_ctx); + sro_cache->init(); + } + +diff --git a/src/tools/CMakeLists.txt b/src/tools/CMakeLists.txt +index 72ab342..f7c5872 100644 +--- a/src/tools/CMakeLists.txt ++++ b/src/tools/CMakeLists.txt +@@ -99,7 +99,6 @@ endif(WITH_CEPHFS) + if(WITH_RBD) + add_subdirectory(rbd) + add_subdirectory(rbd_mirror) +- add_subdirectory(rbd_cache) + if(LINUX) + add_subdirectory(rbd_nbd) + endif() +@@ -108,4 +107,5 @@ if(WITH_RBD) + endif() + endif(WITH_RBD) + ++add_subdirectory(ceph_immutable_object_cache) + add_subdirectory(ceph-dencoder) +diff --git a/src/tools/ceph_immutable_object_cache/CMakeLists.txt b/src/tools/ceph_immutable_object_cache/CMakeLists.txt +new file mode 100644 +index 0000000..c7c7af3 +--- /dev/null ++++ b/src/tools/ceph_immutable_object_cache/CMakeLists.txt +@@ -0,0 +1,11 @@ ++add_executable(ceph-immutable-object-cache ++ ${CMAKE_SOURCE_DIR}/src/librbd/cache/SharedPersistentObjectCacherFile.cc ++ ObjectCacheStore.cc ++ CacheController.cc ++ CacheServer.cc ++ CacheSession.cc ++ main.cc) ++target_link_libraries(ceph-immutable-object-cache ++ librados ++ global) ++install(TARGETS ceph-immutable-object-cache DESTINATION bin) +diff --git a/src/tools/ceph_immutable_object_cache/CacheClient.cc b/src/tools/ceph_immutable_object_cache/CacheClient.cc +new file mode 100644 +index 0000000..a7116bf +--- /dev/null ++++ b/src/tools/ceph_immutable_object_cache/CacheClient.cc +@@ -0,0 +1,205 @@ ++// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- ++// vim: ts=8 sw=2 smarttab ++ ++#include "CacheClient.h" ++ ++#define dout_context g_ceph_context ++#define dout_subsys ceph_subsys_immutable_obj_cache ++#undef dout_prefix ++#define dout_prefix *_dout << "ceph::cache::CacheControllerSocketClient: " << this << " " \ ++ << __func__ << ": " ++ ++ ++using boost::asio::local::stream_protocol; ++ ++namespace ceph { ++namespace immutable_obj_cache { ++ ++ CacheClient::CacheClient(const std::string& file, ClientProcessMsg processmsg, CephContext* ceph_ctx) ++ : m_io_service_work(m_io_service), ++ m_dm_socket(m_io_service), ++ m_client_process_msg(processmsg), ++ m_ep(stream_protocol::endpoint(file)), ++ m_session_work(false), ++ cct(ceph_ctx) ++ { ++ // TODO wrapper io_service ++ std::thread thd([this](){m_io_service.run();}); ++ thd.detach(); ++ } ++ ++ void CacheClient::run(){ ++ } ++ ++ bool CacheClient::is_session_work() { ++ return m_session_work.load() == true; ++ } ++ ++ // just when error occur, call this method. ++ void CacheClient::close() { ++ m_session_work.store(false); ++ boost::system::error_code close_ec; ++ m_dm_socket.close(close_ec); ++ if(close_ec) { ++ ldout(cct, 20) << "close: " << close_ec.message() << dendl; ++ } ++ ldout(cct, 20) << "session don't work, later all request will be dispatched to rados layer" << dendl; ++ } ++ ++ int CacheClient::connect() { ++ boost::system::error_code ec; ++ m_dm_socket.connect(m_ep, ec); ++ if(ec) { ++ if(ec == boost::asio::error::connection_refused) { ++ ldout(cct, 20) << ec.message() << " : maybe rbd-cache Controller don't startup. " ++ << "Now data will be read from ceph cluster " << dendl; ++ } else { ++ ldout(cct, 20) << "connect: " << ec.message() << dendl; ++ } ++ ++ if(m_dm_socket.is_open()) { ++ // Set to indicate what error occurred, if any. ++ // Note that, even if the function indicates an error, ++ // the underlying descriptor is closed. ++ boost::system::error_code close_ec; ++ m_dm_socket.close(close_ec); ++ if(close_ec) { ++ ldout(cct, 20) << "close: " << close_ec.message() << dendl; ++ } ++ } ++ return -1; ++ } ++ ++ ldout(cct, 20) <<"connect success"<< dendl; ++ ++ return 0; ++ } ++ ++ int CacheClient::register_volume(std::string pool_name, std::string vol_name, uint64_t vol_size) { ++ // cache controller will init layout ++ rbdsc_req_type_t *message = new rbdsc_req_type_t(); ++ message->type = RBDSC_REGISTER; ++ memcpy(message->pool_name, pool_name.c_str(), pool_name.size()); ++ memcpy(message->vol_name, vol_name.c_str(), vol_name.size()); ++ message->vol_size = vol_size; ++ message->offset = 0; ++ message->length = 0; ++ ++ uint64_t ret; ++ boost::system::error_code ec; ++ ++ ret = boost::asio::write(m_dm_socket, boost::asio::buffer((char*)message, message->size()), ec); ++ if(ec) { ++ ldout(cct, 20) << "write fails : " << ec.message() << dendl; ++ return -1; ++ } ++ ++ if(ret != message->size()) { ++ ldout(cct, 20) << "write fails : ret != send_bytes " << dendl; ++ return -1; ++ } ++ ++ // hard code TODO ++ ret = boost::asio::read(m_dm_socket, boost::asio::buffer(m_recv_buffer, RBDSC_MSG_LEN), ec); ++ if(ec == boost::asio::error::eof) { ++ ldout(cct, 20) << "recv eof" << dendl; ++ return -1; ++ } ++ ++ if(ec) { ++ ldout(cct, 20) << "write fails : " << ec.message() << dendl; ++ return -1; ++ } ++ ++ if(ret != RBDSC_MSG_LEN) { ++ ldout(cct, 20) << "write fails : ret != receive bytes " << dendl; ++ return -1; ++ } ++ ++ m_client_process_msg(std::string(m_recv_buffer, ret)); ++ ++ delete message; ++ ++ ldout(cct, 20) << "register volume success" << dendl; ++ ++ // TODO ++ m_session_work.store(true); ++ ++ return 0; ++ } ++ ++ // if occur any error, we just return false. Then read from rados. ++ int CacheClient::lookup_object(std::string pool_name, std::string vol_name, std::string object_id, Context* on_finish) { ++ rbdsc_req_type_t *message = new rbdsc_req_type_t(); ++ message->type = RBDSC_READ; ++ memcpy(message->pool_name, pool_name.c_str(), pool_name.size()); ++ memcpy(message->vol_name, object_id.c_str(), object_id.size()); ++ message->vol_size = 0; ++ message->offset = 0; ++ message->length = 0; ++ ++ boost::asio::async_write(m_dm_socket, ++ boost::asio::buffer((char*)message, message->size()), ++ boost::asio::transfer_exactly(RBDSC_MSG_LEN), ++ [this, on_finish, message](const boost::system::error_code& err, size_t cb) { ++ delete message; ++ if(err) { ++ ldout(cct, 20) << "lookup_object: async_write fails." << err.message() << dendl; ++ close(); ++ on_finish->complete(false); ++ return; ++ } ++ if(cb != RBDSC_MSG_LEN) { ++ ldout(cct, 20) << "lookup_object: async_write fails. in-complete request" << dendl; ++ close(); ++ on_finish->complete(false); ++ return; ++ } ++ get_result(on_finish); ++ }); ++ ++ return 0; ++ } ++ ++ void CacheClient::get_result(Context* on_finish) { ++ boost::asio::async_read(m_dm_socket, boost::asio::buffer(m_recv_buffer, RBDSC_MSG_LEN), ++ boost::asio::transfer_exactly(RBDSC_MSG_LEN), ++ [this, on_finish](const boost::system::error_code& err, size_t cb) { ++ if(err == boost::asio::error::eof) { ++ ldout(cct, 20) << "get_result: ack is EOF." << dendl; ++ close(); ++ on_finish->complete(false); ++ return; ++ } ++ if(err) { ++ ldout(cct, 20) << "get_result: async_read fails:" << err.message() << dendl; ++ close(); ++ on_finish->complete(false); // TODO replace this assert with some metohds. ++ return; ++ } ++ if (cb != RBDSC_MSG_LEN) { ++ close(); ++ ldout(cct, 20) << "get_result: in-complete ack." << dendl; ++ on_finish->complete(false); // TODO: replace this assert with some methods. ++ } ++ ++ rbdsc_req_type_t *io_ctx = (rbdsc_req_type_t*)(m_recv_buffer); ++ ++ // TODO: re-occur yuan's bug ++ if(io_ctx->type == RBDSC_READ) { ++ ldout(cct, 20) << "get rbdsc_read... " << dendl; ++ assert(0); ++ } ++ ++ if (io_ctx->type == RBDSC_READ_REPLY) { ++ on_finish->complete(true); ++ return; ++ } else { ++ on_finish->complete(false); ++ return; ++ } ++ }); ++ } ++ ++} // namespace immutable_obj_cache ++} // namespace ceph +diff --git a/src/tools/ceph_immutable_object_cache/CacheClient.h b/src/tools/ceph_immutable_object_cache/CacheClient.h +new file mode 100644 +index 0000000..d82ab8f +--- /dev/null ++++ b/src/tools/ceph_immutable_object_cache/CacheClient.h +@@ -0,0 +1,53 @@ ++// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- ++// vim: ts=8 sw=2 smarttab ++ ++#ifndef CEPH_CACHE_CLIENT_H ++#define CEPH_CACHE_CLIENT_H ++ ++#include <atomic> ++#include <boost/asio.hpp> ++#include <boost/bind.hpp> ++#include <boost/asio/error.hpp> ++#include <boost/algorithm/string.hpp> ++#include "librbd/ImageCtx.h" ++#include "include/assert.h" ++#include "include/Context.h" ++#include "SocketCommon.h" ++ ++ ++using boost::asio::local::stream_protocol; ++ ++namespace ceph { ++namespace immutable_obj_cache { ++ ++class CacheClient { ++public: ++ CacheClient(const std::string& file, ClientProcessMsg processmsg, CephContext* ceph_ctx); ++ void run(); ++ bool is_session_work(); ++ ++ void close(); ++ int connect(); ++ ++ int register_volume(std::string pool_name, std::string vol_name, uint64_t vol_size); ++ int lookup_object(std::string pool_name, std::string vol_name, std::string object_id, Context* on_finish); ++ void get_result(Context* on_finish); ++ ++private: ++ boost::asio::io_service m_io_service; ++ boost::asio::io_service::work m_io_service_work; ++ stream_protocol::socket m_dm_socket; ++ ClientProcessMsg m_client_process_msg; ++ stream_protocol::endpoint m_ep; ++ char m_recv_buffer[1024]; ++ ++ // atomic modfiy for this variable. ++ // thread 1 : asio callback thread modify it. ++ // thread 2 : librbd read it. ++ std::atomic<bool> m_session_work; ++ CephContext* cct; ++}; ++ ++} // namespace immutable_obj_cache ++} // namespace ceph ++#endif +diff --git a/src/tools/ceph_immutable_object_cache/CacheController.cc b/src/tools/ceph_immutable_object_cache/CacheController.cc +new file mode 100644 +index 0000000..cb636d2 +--- /dev/null ++++ b/src/tools/ceph_immutable_object_cache/CacheController.cc +@@ -0,0 +1,117 @@ ++// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- ++// vim: ts=8 sw=2 smarttab ++ ++#include "CacheController.h" ++ ++#define dout_context g_ceph_context ++#define dout_subsys ceph_subsys_immutable_obj_cache ++#undef dout_prefix ++#define dout_prefix *_dout << "ceph::cache::CacheController: " << this << " " \ ++ << __func__ << ": " ++ ++namespace ceph { ++namespace immutable_obj_cache { ++ ++class ThreadPoolSingleton : public ThreadPool { ++public: ++ ContextWQ *op_work_queue; ++ ++ explicit ThreadPoolSingleton(CephContext *cct) ++ : ThreadPool(cct, "ceph::cache::thread_pool", "tp_librbd_cache", 32, ++ "pcache_threads"), ++ op_work_queue(new ContextWQ("ceph::pcache_op_work_queue", ++ cct->_conf.get_val<int64_t>("rbd_op_thread_timeout"), ++ this)) { ++ start(); ++ } ++ ~ThreadPoolSingleton() override { ++ op_work_queue->drain(); ++ delete op_work_queue; ++ ++ stop(); ++ } ++}; ++ ++ ++CacheController::CacheController(CephContext *cct, const std::vector<const char*> &args): ++ m_args(args), m_cct(cct) { ++ ++} ++ ++CacheController::~CacheController() { ++ ++} ++ ++int CacheController::init() { ++ ThreadPoolSingleton* thread_pool_singleton = &m_cct->lookup_or_create_singleton_object<ThreadPoolSingleton>( ++ "ceph::cache::thread_pool", false, m_cct); ++ pcache_op_work_queue = thread_pool_singleton->op_work_queue; ++ ++ m_object_cache_store = new ObjectCacheStore(m_cct, pcache_op_work_queue); ++ //TODO(): make this configurable ++ int r = m_object_cache_store->init(true); ++ if (r < 0) { ++ lderr(m_cct) << "init error\n" << dendl; ++ } ++ return r; ++} ++ ++int CacheController::shutdown() { ++ int r = m_object_cache_store->shutdown(); ++ return r; ++} ++ ++void CacheController::handle_signal(int signum){} ++ ++void CacheController::run() { ++ try { ++ //TODO(): use new socket path ++ std::string controller_path = m_cct->_conf.get_val<std::string>("rbd_shared_cache_sock"); ++ std::remove(controller_path.c_str()); ++ ++ m_cache_server = new CacheServer(controller_path, ++ ([&](uint64_t p, std::string s){handle_request(p, s);}), m_cct); ++ m_cache_server->run(); ++ } catch (std::exception& e) { ++ lderr(m_cct) << "Exception: " << e.what() << dendl; ++ } ++} ++ ++void CacheController::handle_request(uint64_t session_id, std::string msg){ ++ rbdsc_req_type_t *io_ctx = (rbdsc_req_type_t*)(msg.c_str()); ++ ++ int ret = 0; ++ ++ switch (io_ctx->type) { ++ case RBDSC_REGISTER: { ++ // init cache layout for volume ++ m_object_cache_store->init_cache(io_ctx->vol_name, io_ctx->vol_size); ++ io_ctx->type = RBDSC_REGISTER_REPLY; ++ m_cache_server->send(session_id, std::string((char*)io_ctx, msg.size())); ++ ++ break; ++ } ++ case RBDSC_READ: { ++ // lookup object in local cache store ++ ret = m_object_cache_store->lookup_object(io_ctx->pool_name, io_ctx->vol_name); ++ if (ret < 0) { ++ io_ctx->type = RBDSC_READ_RADOS; ++ } else { ++ io_ctx->type = RBDSC_READ_REPLY; ++ } ++ if (io_ctx->type != RBDSC_READ_REPLY) { ++ assert(0); ++ } ++ m_cache_server->send(session_id, std::string((char*)io_ctx, msg.size())); ++ ++ break; ++ } ++ ldout(m_cct, 5) << "can't recongize request" << dendl; ++ assert(0); // TODO replace it. ++ } ++} ++ ++} // namespace immutable_obj_cache ++} // namespace ceph ++ ++ +diff --git a/src/tools/ceph_immutable_object_cache/CacheController.h b/src/tools/ceph_immutable_object_cache/CacheController.h +new file mode 100644 +index 0000000..837fe36 +--- /dev/null ++++ b/src/tools/ceph_immutable_object_cache/CacheController.h +@@ -0,0 +1,53 @@ ++// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- ++// vim: ts=8 sw=2 smarttab ++ ++#ifndef CEPH_CACHE_CONTROLLER_H ++#define CEPH_CACHE_CONTROLLER_H ++ ++#include "common/Formatter.h" ++#include "common/admin_socket.h" ++#include "common/debug.h" ++#include "common/errno.h" ++#include "common/ceph_context.h" ++#include "common/Mutex.h" ++#include "common/WorkQueue.h" ++#include "include/rados/librados.hpp" ++#include "include/rbd/librbd.h" ++#include "include/assert.h" ++#include "librbd/ImageCtx.h" ++#include "librbd/ImageState.h" ++#include "CacheServer.h" ++#include "ObjectCacheStore.h" ++ ++#include <thread> ++ ++namespace ceph { ++namespace immutable_obj_cache { ++ ++class CacheController { ++ public: ++ CacheController(CephContext *cct, const std::vector<const char*> &args); ++ ~CacheController(); ++ ++ int init(); ++ ++ int shutdown(); ++ ++ void handle_signal(int sinnum); ++ ++ void run(); ++ ++ void handle_request(uint64_t sesstion_id, std::string msg); ++ ++ private: ++ CacheServer *m_cache_server; ++ std::vector<const char*> m_args; ++ CephContext *m_cct; ++ ObjectCacheStore *m_object_cache_store; ++ ContextWQ* pcache_op_work_queue; ++}; ++ ++} // namespace immutable_obj_cache ++} // namespace ceph ++ ++#endif +diff --git a/src/tools/ceph_immutable_object_cache/CacheServer.cc b/src/tools/ceph_immutable_object_cache/CacheServer.cc +new file mode 100644 +index 0000000..dd2d47e +--- /dev/null ++++ b/src/tools/ceph_immutable_object_cache/CacheServer.cc +@@ -0,0 +1,99 @@ ++// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- ++// vim: ts=8 sw=2 smarttab ++ ++#include "common/debug.h" ++#include "common/ceph_context.h" ++#include "CacheServer.h" ++ ++#define dout_context g_ceph_context ++#define dout_subsys ceph_subsys_immutable_obj_cache ++#undef dout_prefix ++#define dout_prefix *_dout << "ceph::cache::CacheControllerSocket: " << this << " " \ ++ << __func__ << ": " ++ ++ ++using boost::asio::local::stream_protocol; ++ ++namespace ceph { ++namespace immutable_obj_cache { ++ ++CacheServer::CacheServer(const std::string& file, ProcessMsg processmsg, CephContext* cct) ++ : cct(cct), m_server_process_msg(processmsg), ++ m_local_path(file), m_acceptor(m_io_service) {} ++ ++CacheServer::~CacheServer(){} ++ ++void CacheServer::run() { ++ bool ret; ++ ret = start_accept(); ++ if(!ret) { ++ return; ++ } ++ m_io_service.run(); ++} ++ ++// TODO : use callback to replace this function. ++void CacheServer::send(uint64_t session_id, std::string msg) { ++ auto it = m_session_map.find(session_id); ++ if (it != m_session_map.end()) { ++ it->second->send(msg); ++ } else { ++ // TODO : why don't find existing session id ? ++ ldout(cct, 20) << "don't find session id..." << dendl; ++ assert(0); ++ } ++} ++ ++// when creating one acceptor, can control every step in this way. ++bool CacheServer::start_accept() { ++ boost::system::error_code ec; ++ m_acceptor.open(m_local_path.protocol(), ec); ++ if(ec) { ++ ldout(cct, 20) << "m_acceptor open fails: " << ec.message() << dendl; ++ return false; ++ } ++ ++ // TODO control acceptor attribute. ++ ++ m_acceptor.bind(m_local_path, ec); ++ if(ec) { ++ ldout(cct, 20) << "m_acceptor bind fails: " << ec.message() << dendl; ++ return false; ++ } ++ ++ m_acceptor.listen(boost::asio::socket_base::max_connections, ec); ++ if(ec) { ++ ldout(cct, 20) << "m_acceptor listen fails: " << ec.message() << dendl; ++ return false; ++ } ++ ++ accept(); ++ return true; ++} ++ ++void CacheServer::accept() { ++ CacheSessionPtr new_session(new CacheSession(m_session_id, m_io_service, m_server_process_msg, cct)); ++ m_acceptor.async_accept(new_session->socket(), ++ boost::bind(&CacheServer::handle_accept, this, new_session, ++ boost::asio::placeholders::error)); ++} ++ ++void CacheServer::handle_accept(CacheSessionPtr new_session, const boost::system::error_code& error) { ++ ++ if(error) { ++ lderr(cct) << "async accept fails : " << error.message() << dendl; ++ assert(0); // TODO ++ } ++ ++ m_session_map.emplace(m_session_id, new_session); ++ // TODO : session setting ++ new_session->start(); ++ m_session_id++; ++ ++ // lanuch next accept ++ accept(); ++} ++ ++} // namespace immutable_obj_cache ++} // namespace ceph ++ +diff --git a/src/tools/ceph_immutable_object_cache/CacheServer.h b/src/tools/ceph_immutable_object_cache/CacheServer.h +new file mode 100644 +index 0000000..6c5c133 +--- /dev/null ++++ b/src/tools/ceph_immutable_object_cache/CacheServer.h +@@ -0,0 +1,54 @@ ++// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- ++// vim: ts=8 sw=2 smarttab ++ ++#ifndef CEPH_CACHE_SERVER_H ++#define CEPH_CACHE_SERVER_H ++ ++#include <cstdio> ++#include <iostream> ++#include <array> ++#include <memory> ++#include <string> ++#include <boost/bind.hpp> ++#include <boost/asio.hpp> ++#include <boost/asio/error.hpp> ++#include <boost/algorithm/string.hpp> ++ ++#include "include/assert.h" ++#include "SocketCommon.h" ++#include "CacheSession.h" ++ ++ ++using boost::asio::local::stream_protocol; ++ ++namespace ceph { ++namespace immutable_obj_cache { ++ ++class CacheServer { ++ ++ public: ++ CacheServer(const std::string& file, ProcessMsg processmsg, CephContext* cct); ++ ~CacheServer(); ++ ++ void run(); ++ void send(uint64_t session_id, std::string msg); ++ ++ private: ++ bool start_accept(); ++ void accept(); ++ void handle_accept(CacheSessionPtr new_session, const boost::system::error_code& error); ++ ++ private: ++ CephContext* cct; ++ boost::asio::io_service m_io_service; // TODO wrapper it. ++ ProcessMsg m_server_process_msg; ++ stream_protocol::endpoint m_local_path; ++ stream_protocol::acceptor m_acceptor; ++ uint64_t m_session_id = 1; ++ std::map<uint64_t, CacheSessionPtr> m_session_map; ++}; ++ ++} // namespace immutable_obj_cache ++} // namespace ceph ++ ++#endif +diff --git a/src/tools/ceph_immutable_object_cache/CacheSession.cc b/src/tools/ceph_immutable_object_cache/CacheSession.cc +new file mode 100644 +index 0000000..6cffb41 +--- /dev/null ++++ b/src/tools/ceph_immutable_object_cache/CacheSession.cc +@@ -0,0 +1,115 @@ ++// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- ++// vim: ts=8 sw=2 smarttab ++ ++#include "common/debug.h" ++#include "common/ceph_context.h" ++#include "CacheSession.h" ++ ++#define dout_context g_ceph_context ++#define dout_subsys ceph_subsys_immutable_obj_cache ++#undef dout_prefix ++#define dout_prefix *_dout << "ceph::cache::CacheSession: " << this << " " \ ++ << __func__ << ": " ++ ++ ++namespace ceph { ++namespace immutable_obj_cache { ++ ++CacheSession::CacheSession(uint64_t session_id, boost::asio::io_service& io_service, ProcessMsg processmsg, CephContext* cct) ++ : m_session_id(session_id), m_dm_socket(io_service), process_msg(processmsg), cct(cct) ++ {} ++ ++CacheSession::~CacheSession(){} ++ ++stream_protocol::socket& CacheSession::socket() { ++ return m_dm_socket; ++} ++ ++void CacheSession::start() { ++ if(true) { ++ serial_handing_request(); ++ } else { ++ parallel_handing_request(); ++ } ++} ++// flow: ++// ++// recv request --> process request --> reply ack ++// | | ++// --------------<------------------------- ++void CacheSession::serial_handing_request() { ++ boost::asio::async_read(m_dm_socket, boost::asio::buffer(m_buffer, RBDSC_MSG_LEN), ++ boost::asio::transfer_exactly(RBDSC_MSG_LEN), ++ boost::bind(&CacheSession::handle_read, ++ shared_from_this(), ++ boost::asio::placeholders::error, ++ boost::asio::placeholders::bytes_transferred)); ++} ++ ++// flow : ++// ++// --> thread 1: process request ++// recv request --> thread 2: process request --> reply ack ++// --> thread n: process request ++// ++void CacheSession::parallel_handing_request() { ++ // TODO ++} ++ ++void CacheSession::handle_read(const boost::system::error_code& error, size_t bytes_transferred) { ++ // when recv eof, the most proble is that client side close socket. ++ // so, server side need to end handing_request ++ if(error == boost::asio::error::eof) { ++ ldout(cct, 20) << "session: async_read : " << error.message() << dendl; ++ return; ++ } ++ ++ if(error) { ++ ldout(cct, 20) << "session: async_read fails: " << error.message() << dendl; ++ assert(0); ++ } ++ ++ if(bytes_transferred != RBDSC_MSG_LEN) { ++ ldout(cct, 20) << "session : request in-complete. "<<dendl; ++ assert(0); ++ } ++ ++ // TODO async_process can increse coding readable. ++ // process_msg_callback call handle async_send ++ process_msg(m_session_id, std::string(m_buffer, bytes_transferred)); ++} ++ ++void CacheSession::handle_write(const boost::system::error_code& error, size_t bytes_transferred) { ++ if (error) { ++ ldout(cct, 20) << "session: async_write fails: " << error.message() << dendl; ++ assert(0); ++ } ++ ++ if(bytes_transferred != RBDSC_MSG_LEN) { ++ ldout(cct, 20) << "session : reply in-complete. "<<dendl; ++ assert(0); ++ } ++ ++ boost::asio::async_read(m_dm_socket, boost::asio::buffer(m_buffer), ++ boost::asio::transfer_exactly(RBDSC_MSG_LEN), ++ boost::bind(&CacheSession::handle_read, ++ shared_from_this(), ++ boost::asio::placeholders::error, ++ boost::asio::placeholders::bytes_transferred)); ++ ++} ++ ++void CacheSession::send(std::string msg) { ++ boost::asio::async_write(m_dm_socket, ++ boost::asio::buffer(msg.c_str(), msg.size()), ++ boost::asio::transfer_exactly(RBDSC_MSG_LEN), ++ boost::bind(&CacheSession::handle_write, ++ shared_from_this(), ++ boost::asio::placeholders::error, ++ boost::asio::placeholders::bytes_transferred)); ++ ++} ++ ++} // namespace immutable_obj_cache ++} // namespace ceph ++ +diff --git a/src/tools/ceph_immutable_object_cache/CacheSession.h b/src/tools/ceph_immutable_object_cache/CacheSession.h +new file mode 100644 +index 0000000..ce2591b +--- /dev/null ++++ b/src/tools/ceph_immutable_object_cache/CacheSession.h +@@ -0,0 +1,58 @@ ++// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- ++// vim: ts=8 sw=2 smarttab ++ ++#ifndef CEPH_CACHE_SESSION_H ++#define CEPH_CACHE_SESSION_H ++ ++#include <iostream> ++#include <string> ++#include <boost/bind.hpp> ++#include <boost/asio.hpp> ++#include <boost/asio/error.hpp> ++#include <boost/algorithm/string.hpp> ++ ++#include "include/assert.h" ++#include "SocketCommon.h" ++ ++ ++using boost::asio::local::stream_protocol; ++ ++namespace ceph { ++namespace immutable_obj_cache { ++ ++class CacheSession : public std::enable_shared_from_this<CacheSession> { ++public: ++ CacheSession(uint64_t session_id, boost::asio::io_service& io_service, ProcessMsg processmsg, CephContext* cct); ++ ~CacheSession(); ++ ++ stream_protocol::socket& socket(); ++ void start(); ++ void serial_handing_request(); ++ void parallel_handing_request(); ++ ++private: ++ ++ void handle_read(const boost::system::error_code& error, size_t bytes_transferred); ++ ++ void handle_write(const boost::system::error_code& error, size_t bytes_transferred); ++ ++public: ++ void send(std::string msg); ++ ++private: ++ uint64_t m_session_id; ++ stream_protocol::socket m_dm_socket; ++ ProcessMsg process_msg; ++ CephContext* cct; ++ ++ // Buffer used to store data received from the client. ++ //std::array<char, 1024> data_; ++ char m_buffer[1024]; ++}; ++ ++typedef std::shared_ptr<CacheSession> CacheSessionPtr; ++ ++} // namespace immutable_obj_cache ++} // namespace ceph ++ ++#endif +diff --git a/src/tools/ceph_immutable_object_cache/ObjectCacheStore.cc b/src/tools/ceph_immutable_object_cache/ObjectCacheStore.cc +new file mode 100644 +index 0000000..50721ca +--- /dev/null ++++ b/src/tools/ceph_immutable_object_cache/ObjectCacheStore.cc +@@ -0,0 +1,172 @@ ++// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- ++// vim: ts=8 sw=2 smarttab ++ ++#include "ObjectCacheStore.h" ++ ++#define dout_context g_ceph_context ++#define dout_subsys ceph_subsys_immutable_obj_cache ++#undef dout_prefix ++#define dout_prefix *_dout << "ceph::cache::ObjectCacheStore: " << this << " " \ ++ << __func__ << ": " ++ ++namespace ceph { ++namespace immutable_obj_cache { ++ ++ObjectCacheStore::ObjectCacheStore(CephContext *cct, ContextWQ* work_queue) ++ : m_cct(cct), m_work_queue(work_queue), ++ m_rados(new librados::Rados()) { ++ ++ uint64_t object_cache_entries = ++ cct->_conf.get_val<int64_t>("rbd_shared_cache_entries"); ++ ++ //TODO(): allow to set level ++ m_policy = new SimplePolicy(object_cache_entries, 0.5); ++} ++ ++ObjectCacheStore::~ObjectCacheStore() { ++ delete m_policy; ++} ++ ++int ObjectCacheStore::init(bool reset) { ++ ++ int ret = m_rados->init_with_context(m_cct); ++ if(ret < 0) { ++ lderr(m_cct) << "fail to init Ceph context" << dendl; ++ return ret; ++ } ++ ++ ret = m_rados->connect(); ++ if(ret < 0 ) { ++ lderr(m_cct) << "fail to conect to cluster" << dendl; ++ return ret; ++ } ++ ++ std::string cache_path = m_cct->_conf.get_val<std::string>("rbd_shared_cache_path"); ++ //TODO(): check and reuse existing cache objects ++ if(reset) { ++ std::string cmd = "exec rm -rf " + cache_path + "/rbd_cache*; exec mkdir -p " + cache_path; ++ //TODO(): to use std::filesystem ++ int r = system(cmd.c_str()); ++ } ++ ++ evict_thd = new std::thread([this]{this->evict_thread_body();}); ++ return ret; ++} ++ ++int ObjectCacheStore::do_promote(std::string pool_name, std::string object_name) { ++ int ret = 0; ++ std::string cache_file_name = pool_name + object_name; ++ ++ //TODO(): lock on ioctx map ++ if (m_ioctxs.find(pool_name) == m_ioctxs.end()) { ++ librados::IoCtx* io_ctx = new librados::IoCtx(); ++ ret = m_rados->ioctx_create(pool_name.c_str(), *io_ctx); ++ if (ret < 0) { ++ lderr(m_cct) << "fail to create ioctx" << dendl; ++ assert(0); ++ } ++ m_ioctxs.emplace(pool_name, io_ctx); ++ } ++ ++ assert(m_ioctxs.find(pool_name) != m_ioctxs.end()); ++ ++ librados::IoCtx* ioctx = m_ioctxs[pool_name]; ++ ++ librados::bufferlist* read_buf = new librados::bufferlist(); ++ int object_size = 4096*1024; //TODO(): read config from image metadata ++ ++ //TODO(): async promote ++ ret = promote_object(ioctx, object_name, read_buf, object_size); ++ if (ret == -ENOENT) { ++ read_buf->append(std::string(object_size, '0')); ++ ret = 0; ++ } ++ ++ if( ret < 0) { ++ lderr(m_cct) << "fail to read from rados" << dendl; ++ return ret; ++ } ++ ++ // persistent to cache ++ librbd::cache::SyncFile cache_file(m_cct, cache_file_name); ++ cache_file.open(); ++ ret = cache_file.write_object_to_file(*read_buf, object_size); ++ ++ // update metadata ++ assert(OBJ_CACHE_PROMOTING == m_policy->get_status(cache_file_name)); ++ m_policy->update_status(cache_file_name, OBJ_CACHE_PROMOTED); ++ assert(OBJ_CACHE_PROMOTED == m_policy->get_status(cache_file_name)); ++ ++ return ret; ++ ++} ++ ++// return -1, client need to read data from cluster. ++// return 0, client directly read data from cache. ++int ObjectCacheStore::lookup_object(std::string pool_name, std::string object_name) { ++ ++ std::string cache_file_name = pool_name + object_name; ++ ++ CACHESTATUS ret; ++ ret = m_policy->lookup_object(cache_file_name); ++ ++ switch(ret) { ++ case OBJ_CACHE_NONE: ++ return do_promote(pool_name, object_name); ++ case OBJ_CACHE_PROMOTED: ++ return 0; ++ case OBJ_CACHE_PROMOTING: ++ default: ++ return -1; ++ } ++} ++ ++void ObjectCacheStore::evict_thread_body() { ++ int ret; ++ while(m_evict_go) { ++ ret = evict_objects(); ++ } ++} ++ ++ ++int ObjectCacheStore::shutdown() { ++ m_evict_go = false; ++ evict_thd->join(); ++ m_rados->shutdown(); ++ return 0; ++} ++ ++int ObjectCacheStore::init_cache(std::string vol_name, uint64_t vol_size) { ++ return 0; ++} ++ ++int ObjectCacheStore::lock_cache(std::string vol_name) { ++ return 0; ++} ++ ++int ObjectCacheStore::promote_object(librados::IoCtx* ioctx, std::string object_name, librados::bufferlist* read_buf, uint64_t read_len) { ++ int ret; ++ ++ librados::AioCompletion* read_completion = librados::Rados::aio_create_completion(); ++ ++ ret = ioctx->aio_read(object_name, read_completion, read_buf, read_len, 0); ++ if(ret < 0) { ++ lderr(m_cct) << "fail to read from rados" << dendl; ++ return ret; ++ } ++ read_completion->wait_for_complete(); ++ ret = read_completion->get_return_value(); ++ return ret; ++ ++} ++ ++int ObjectCacheStore::evict_objects() { ++ std::list<std::string> obj_list; ++ m_policy->get_evict_list(&obj_list); ++ for (auto& obj: obj_list) { ++ //do_evict(obj); ++ } ++} ++ ++} // namespace immutable_obj_cache ++} // namespace ceph +diff --git a/src/tools/ceph_immutable_object_cache/ObjectCacheStore.h b/src/tools/ceph_immutable_object_cache/ObjectCacheStore.h +new file mode 100644 +index 0000000..d044b27 +--- /dev/null ++++ b/src/tools/ceph_immutable_object_cache/ObjectCacheStore.h +@@ -0,0 +1,70 @@ ++// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- ++// vim: ts=8 sw=2 smarttab ++ ++#ifndef CEPH_CACHE_OBJECT_CACHE_STORE_H ++#define CEPH_CACHE_OBJECT_CACHE_STORE_H ++ ++#include "common/debug.h" ++#include "common/errno.h" ++#include "common/ceph_context.h" ++#include "common/Mutex.h" ++#include "include/rados/librados.hpp" ++#include "include/rbd/librbd.h" ++#include "librbd/ImageCtx.h" ++#include "librbd/ImageState.h" ++#include "librbd/cache/SharedPersistentObjectCacherFile.h" ++#include "SimplePolicy.hpp" ++ ++ ++using librados::Rados; ++using librados::IoCtx; ++ ++namespace ceph { ++namespace immutable_obj_cache { ++ ++typedef shared_ptr<librados::Rados> RadosRef; ++typedef shared_ptr<librados::IoCtx> IoCtxRef; ++ ++class ObjectCacheStore ++{ ++ public: ++ ObjectCacheStore(CephContext *cct, ContextWQ* work_queue); ++ ~ObjectCacheStore(); ++ ++ int init(bool reset); ++ ++ int shutdown(); ++ ++ int lookup_object(std::string pool_name, std::string object_name); ++ ++ int init_cache(std::string vol_name, uint64_t vol_size); ++ ++ int lock_cache(std::string vol_name); ++ ++ private: ++ void evict_thread_body(); ++ int evict_objects(); ++ ++ int do_promote(std::string pool_name, std::string object_name); ++ ++ int promote_object(librados::IoCtx*, std::string object_name, ++ librados::bufferlist* read_buf, ++ uint64_t length); ++ ++ CephContext *m_cct; ++ ContextWQ* m_work_queue; ++ RadosRef m_rados; ++ ++ ++ std::map<std::string, librados::IoCtx*> m_ioctxs; ++ ++ librbd::cache::SyncFile *m_cache_file; ++ ++ Policy* m_policy; ++ std::thread* evict_thd; ++ bool m_evict_go = false; ++}; ++ ++} // namespace ceph ++} // namespace immutable_obj_cache ++#endif +diff --git a/src/tools/ceph_immutable_object_cache/Policy.hpp b/src/tools/ceph_immutable_object_cache/Policy.hpp +new file mode 100644 +index 0000000..8090202 +--- /dev/null ++++ b/src/tools/ceph_immutable_object_cache/Policy.hpp +@@ -0,0 +1,33 @@ ++// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- ++// vim: ts=8 sw=2 smarttab ++ ++#ifndef CEPH_CACHE_POLICY_HPP ++#define CEPH_CACHE_POLICY_HPP ++ ++#include <list> ++#include <string> ++ ++namespace ceph { ++namespace immutable_obj_cache { ++ ++enum CACHESTATUS { ++ OBJ_CACHE_NONE = 0, ++ OBJ_CACHE_PROMOTING, ++ OBJ_CACHE_PROMOTED, ++}; ++ ++ ++class Policy { ++public: ++ Policy(){} ++ virtual ~Policy(){}; ++ virtual CACHESTATUS lookup_object(std::string) = 0; ++ virtual int evict_object(std::string&) = 0; ++ virtual void update_status(std::string, CACHESTATUS) = 0; ++ virtual CACHESTATUS get_status(std::string) = 0; ++ virtual void get_evict_list(std::list<std::string>* obj_list) = 0; ++}; ++ ++} // namespace immutable_obj_cache ++} // namespace ceph ++#endif +diff --git a/src/tools/ceph_immutable_object_cache/SimplePolicy.hpp b/src/tools/ceph_immutable_object_cache/SimplePolicy.hpp +new file mode 100644 +index 0000000..757ee6a +--- /dev/null ++++ b/src/tools/ceph_immutable_object_cache/SimplePolicy.hpp +@@ -0,0 +1,163 @@ ++// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- ++// vim: ts=8 sw=2 smarttab ++ ++#ifndef CEPH_CACHE_SIMPLE_POLICY_HPP ++#define CEPH_CACHE_SIMPLE_POLICY_HPP ++ ++#include "Policy.hpp" ++#include "include/lru.h" ++#include "common/RWLock.h" ++#include "common/Mutex.h" ++ ++#include <vector> ++#include <unordered_map> ++#include <string> ++ ++namespace ceph { ++namespace immutable_obj_cache { ++ ++ ++class SimplePolicy : public Policy { ++public: ++ SimplePolicy(uint64_t block_num, float watermark) ++ : m_watermark(watermark), m_entry_count(block_num), ++ m_cache_map_lock("rbd::cache::SimplePolicy::m_cache_map_lock"), ++ m_free_list_lock("rbd::cache::SimplePolicy::m_free_list_lock") ++ { ++ ++ for(uint64_t i = 0; i < m_entry_count; i++) { ++ m_free_list.push_back(new Entry()); ++ } ++ ++ } ++ ++ ~SimplePolicy() { ++ for(uint64_t i = 0; i < m_entry_count; i++) { ++ Entry* entry = reinterpret_cast<Entry*>(m_free_list.front()); ++ delete entry; ++ m_free_list.pop_front(); ++ } ++ } ++ ++ CACHESTATUS lookup_object(std::string cache_file_name) { ++ ++ //TODO(): check race condition ++ RWLock::WLocker wlocker(m_cache_map_lock); ++ ++ auto entry_it = m_cache_map.find(cache_file_name); ++ if(entry_it == m_cache_map.end()) { ++ Mutex::Locker locker(m_free_list_lock); ++ Entry* entry = reinterpret_cast<Entry*>(m_free_list.front()); ++ assert(entry != nullptr); ++ m_free_list.pop_front(); ++ entry->status = OBJ_CACHE_PROMOTING; ++ ++ m_cache_map[cache_file_name] = entry; ++ ++ return OBJ_CACHE_NONE; ++ } ++ ++ Entry* entry = entry_it->second; ++ ++ if(entry->status == OBJ_CACHE_PROMOTED) { ++ // touch it ++ m_promoted_lru.lru_touch(entry); ++ } ++ ++ return entry->status; ++ } ++ ++ int evict_object(std::string& out_cache_file_name) { ++ RWLock::WLocker locker(m_cache_map_lock); ++ ++ return 1; ++ } ++ ++ // TODO(): simplify the logic ++ void update_status(std::string file_name, CACHESTATUS new_status) { ++ RWLock::WLocker locker(m_cache_map_lock); ++ ++ Entry* entry; ++ auto entry_it = m_cache_map.find(file_name); ++ ++ // just check. ++ if(new_status == OBJ_CACHE_PROMOTING) { ++ assert(entry_it == m_cache_map.end()); ++ } ++ ++ assert(entry_it != m_cache_map.end()); ++ ++ entry = entry_it->second; ++ ++ // promoting is done, so update it. ++ if(entry->status == OBJ_CACHE_PROMOTING && new_status== OBJ_CACHE_PROMOTED) { ++ m_promoted_lru.lru_insert_top(entry); ++ entry->status = new_status; ++ return; ++ } ++ ++ assert(0); ++ } ++ ++ // get entry status ++ CACHESTATUS get_status(std::string file_name) { ++ RWLock::RLocker locker(m_cache_map_lock); ++ auto entry_it = m_cache_map.find(file_name); ++ if(entry_it == m_cache_map.end()) { ++ return OBJ_CACHE_NONE; ++ } ++ ++ return entry_it->second->status; ++ } ++ ++ void get_evict_list(std::list<std::string>* obj_list) { ++ RWLock::WLocker locker(m_cache_map_lock); ++ // check free ratio, pop entries from LRU ++ if (m_free_list.size() / m_entry_count < m_watermark) { ++ int evict_num = 10; //TODO(): make this configurable ++ for(int i = 0; i < evict_num; i++) { ++ Entry* entry = reinterpret_cast<Entry*>(m_promoted_lru.lru_expire()); ++ if (entry == nullptr) { ++ continue; ++ } ++ std::string file_name = entry->cache_file_name; ++ obj_list->push_back(file_name); ++ ++ auto entry_it = m_cache_map.find(file_name); ++ m_cache_map.erase(entry_it); ++ ++ //mark this entry as free ++ entry->status = OBJ_CACHE_NONE; ++ Mutex::Locker locker(m_free_list_lock); ++ m_free_list.push_back(entry); ++ } ++ } ++ } ++ ++private: ++ ++ class Entry : public LRUObject { ++ public: ++ CACHESTATUS status; ++ Entry() : status(OBJ_CACHE_NONE){} ++ std::string cache_file_name; ++ void encode(bufferlist &bl){} ++ void decode(bufferlist::iterator &it){} ++ }; ++ ++ float m_watermark; ++ uint64_t m_entry_count; ++ ++ std::unordered_map<std::string, Entry*> m_cache_map; ++ RWLock m_cache_map_lock; ++ ++ std::deque<Entry*> m_free_list; ++ Mutex m_free_list_lock; ++ ++ LRU m_promoted_lru; // include promoted, using status. ++ ++}; ++ ++} // namespace immutable_obj_cache ++} // namespace ceph ++#endif +diff --git a/src/tools/ceph_immutable_object_cache/SocketCommon.h b/src/tools/ceph_immutable_object_cache/SocketCommon.h +new file mode 100644 +index 0000000..53dca54 +--- /dev/null ++++ b/src/tools/ceph_immutable_object_cache/SocketCommon.h +@@ -0,0 +1,54 @@ ++// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- ++// vim: ts=8 sw=2 smarttab ++ ++#ifndef CEPH_CACHE_SOCKET_COMMON_H ++#define CEPH_CACHE_SOCKET_COMMON_H ++ ++namespace ceph { ++namespace immutable_obj_cache { ++ ++static const int RBDSC_REGISTER = 0X11; ++static const int RBDSC_READ = 0X12; ++static const int RBDSC_LOOKUP = 0X13; ++static const int RBDSC_REGISTER_REPLY = 0X14; ++static const int RBDSC_READ_REPLY = 0X15; ++static const int RBDSC_LOOKUP_REPLY = 0X16; ++static const int RBDSC_READ_RADOS = 0X17; ++ ++ ++ ++typedef std::function<void(uint64_t, std::string)> ProcessMsg; ++typedef std::function<void(std::string)> ClientProcessMsg; ++typedef uint8_t rbdsc_req_type; ++ ++//TODO(): switch to bufferlist ++struct rbdsc_req_type_t { ++ rbdsc_req_type type; ++ uint64_t vol_size; ++ uint64_t offset; ++ uint64_t length; ++ char pool_name[256]; ++ char vol_name[256]; ++ ++ uint64_t size() { ++ return sizeof(rbdsc_req_type_t); ++ } ++ ++ std::string to_buffer() { ++ std::stringstream ss; ++ ss << type; ++ ss << vol_size; ++ ss << offset; ++ ss << length; ++ ss << pool_name; ++ ss << vol_name; ++ ++ return ss.str(); ++ } ++}; ++ ++static const int RBDSC_MSG_LEN = sizeof(rbdsc_req_type_t); ++ ++} // namespace immutable_obj_cache ++} // namespace ceph ++#endif +diff --git a/src/tools/ceph_immutable_object_cache/main.cc b/src/tools/ceph_immutable_object_cache/main.cc +new file mode 100644 +index 0000000..7a9131d +--- /dev/null ++++ b/src/tools/ceph_immutable_object_cache/main.cc +@@ -0,0 +1,85 @@ ++// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- ++// vim: ts=8 sw=2 smarttab ++ ++#include "common/ceph_argparse.h" ++#include "common/config.h" ++#include "common/debug.h" ++#include "common/errno.h" ++#include "global/global_init.h" ++#include "global/signal_handler.h" ++#include "CacheController.h" ++ ++#include <vector> ++ ++ceph::immutable_obj_cache::CacheController *cachectl = nullptr; ++ ++void usage() { ++ std::cout << "usage: cache controller [options...]" << std::endl; ++ std::cout << "options:\n"; ++ std::cout << " -m monaddress[:port] connect to specified monitor\n"; ++ std::cout << " --keyring=<path> path to keyring for local cluster\n"; ++ std::cout << " --log-file=<logfile> file to log debug output\n"; ++ std::cout << " --debug-rbd-cachecontroller=<log-level>/<memory-level> set rbd-mirror debug level\n"; ++ generic_server_usage(); ++} ++ ++static void handle_signal(int signum) ++{ ++ if (cachectl) ++ cachectl->handle_signal(signum); ++} ++ ++int main(int argc, const char **argv) ++{ ++ std::vector<const char*> args; ++ env_to_vec(args); ++ argv_to_vec(argc, argv, args); ++ ++ auto cct = global_init(nullptr, args, CEPH_ENTITY_TYPE_CLIENT, ++ CODE_ENVIRONMENT_DAEMON, ++ CINIT_FLAG_UNPRIVILEGED_DAEMON_DEFAULTS); ++ ++ for (auto i = args.begin(); i != args.end(); ++i) { ++ if (ceph_argparse_flag(args, i, "-h", "--help", (char*)NULL)) { ++ usage(); ++ return EXIT_SUCCESS; ++ } ++ } ++ ++ if (g_conf()->daemonize) { ++ global_init_daemonize(g_ceph_context); ++ } ++ g_ceph_context->enable_perf_counter(); ++ ++ common_init_finish(g_ceph_context); ++ ++ init_async_signal_handler(); ++ register_async_signal_handler(SIGHUP, sighup_handler); ++ register_async_signal_handler_oneshot(SIGINT, handle_signal); ++ register_async_signal_handler_oneshot(SIGTERM, handle_signal); ++ ++ std::vector<const char*> cmd_args; ++ argv_to_vec(argc, argv, cmd_args); ++ ++ // disable unnecessary librbd cache ++ g_ceph_context->_conf.set_val_or_die("rbd_cache", "false"); ++ ++ cachectl = new ceph::immutable_obj_cache::CacheController(g_ceph_context, cmd_args); ++ int r = cachectl->init(); ++ if (r < 0) { ++ std::cerr << "failed to initialize: " << cpp_strerror(r) << std::endl; ++ goto cleanup; ++ } ++ ++ cachectl->run(); ++ ++ cleanup: ++ unregister_async_signal_handler(SIGHUP, sighup_handler); ++ unregister_async_signal_handler(SIGINT, handle_signal); ++ unregister_async_signal_handler(SIGTERM, handle_signal); ++ shutdown_async_signal_handler(); ++ ++ delete cachectl; ++ ++ return r < 0 ? EXIT_SUCCESS : EXIT_FAILURE; ++} +diff --git a/src/tools/rbd_cache/CMakeLists.txt b/src/tools/rbd_cache/CMakeLists.txt +deleted file mode 100644 +index 597d802..0000000 +--- a/src/tools/rbd_cache/CMakeLists.txt ++++ /dev/null +@@ -1,9 +0,0 @@ +-add_executable(rbd-cache +- ${CMAKE_SOURCE_DIR}/src/librbd/cache/SharedPersistentObjectCacherFile.cc +- ObjectCacheStore.cc +- CacheController.cc +- main.cc) +-target_link_libraries(rbd-cache +- librados +- global) +-install(TARGETS rbd-cache DESTINATION bin) +diff --git a/src/tools/rbd_cache/CacheController.cc b/src/tools/rbd_cache/CacheController.cc +deleted file mode 100644 +index 620192c..0000000 +--- a/src/tools/rbd_cache/CacheController.cc ++++ /dev/null +@@ -1,116 +0,0 @@ +-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +-// vim: ts=8 sw=2 smarttab +- +-#include "CacheController.h" +- +-#define dout_context g_ceph_context +-#define dout_subsys ceph_subsys_rbd_cache +-#undef dout_prefix +-#define dout_prefix *_dout << "rbd::cache::CacheController: " << this << " " \ +- << __func__ << ": " +- +-namespace rbd { +-namespace cache { +- +-class ThreadPoolSingleton : public ThreadPool { +-public: +- ContextWQ *op_work_queue; +- +- explicit ThreadPoolSingleton(CephContext *cct) +- : ThreadPool(cct, "librbd::cache::thread_pool", "tp_librbd_cache", 32, +- "pcache_threads"), +- op_work_queue(new ContextWQ("librbd::pcache_op_work_queue", +- cct->_conf.get_val<int64_t>("rbd_op_thread_timeout"), +- this)) { +- start(); +- } +- ~ThreadPoolSingleton() override { +- op_work_queue->drain(); +- delete op_work_queue; +- +- stop(); +- } +-}; +- +- +-CacheController::CacheController(CephContext *cct, const std::vector<const char*> &args): +- m_args(args), m_cct(cct) { +- +-} +- +-CacheController::~CacheController() { +- +-} +- +-int CacheController::init() { +- ThreadPoolSingleton* thread_pool_singleton = &m_cct->lookup_or_create_singleton_object<ThreadPoolSingleton>( +- "rbd::cache::thread_pool", false, m_cct); +- pcache_op_work_queue = thread_pool_singleton->op_work_queue; +- +- m_object_cache_store = new ObjectCacheStore(m_cct, pcache_op_work_queue); +- int r = m_object_cache_store->init(false); +- if (r < 0) { +- //derr << "init error\n" << dendl; +- } +- return r; +-} +- +-int CacheController::shutdown() { +- int r = m_object_cache_store->shutdown(); +- return r; +-} +- +-void CacheController::handle_signal(int signum){} +- +-void CacheController::run() { +- try { +- //TODO(): use new socket path +- std::string controller_path = m_cct->_conf.get_val<std::string>("rbd_shared_cache_sock"); +- std::remove(controller_path.c_str()); +- +- m_cache_server = new CacheServer(controller_path, +- ([&](uint64_t p, std::string s){handle_request(p, s);}), m_cct); +- m_cache_server->run(); +- } catch (std::exception& e) { +- std::cerr << "Exception: " << e.what() << "\n"; +- } +-} +- +-void CacheController::handle_request(uint64_t session_id, std::string msg){ +- rbdsc_req_type_t *io_ctx = (rbdsc_req_type_t*)(msg.c_str()); +- +- int ret = 0; +- +- switch (io_ctx->type) { +- case RBDSC_REGISTER: { +- // init cache layout for volume +- m_object_cache_store->init_cache(io_ctx->vol_name, io_ctx->vol_size); +- io_ctx->type = RBDSC_REGISTER_REPLY; +- m_cache_server->send(session_id, std::string((char*)io_ctx, msg.size())); +- +- break; +- } +- case RBDSC_READ: { +- // lookup object in local cache store +- ret = m_object_cache_store->lookup_object(io_ctx->pool_name, io_ctx->vol_name); +- if (ret < 0) { +- io_ctx->type = RBDSC_READ_RADOS; +- } else { +- io_ctx->type = RBDSC_READ_REPLY; +- } +- if (io_ctx->type != RBDSC_READ_REPLY) { +- assert(0); +- } +- m_cache_server->send(session_id, std::string((char*)io_ctx, msg.size())); +- +- break; +- } +- std::cout<<"can't recongize request"<<std::endl; +- assert(0); // TODO replace it. +- } +-} +- +-} // namespace rbd +-} // namespace cache +- +- +diff --git a/src/tools/rbd_cache/CacheController.h b/src/tools/rbd_cache/CacheController.h +deleted file mode 100644 +index 0e23484..0000000 +--- a/src/tools/rbd_cache/CacheController.h ++++ /dev/null +@@ -1,54 +0,0 @@ +-#ifndef CACHE_CONTROLLER_H +-#define CACHE_CONTROLLER_H +- +-#include <thread> +- +-#include "common/Formatter.h" +-#include "common/admin_socket.h" +-#include "common/debug.h" +-#include "common/errno.h" +-#include "common/ceph_context.h" +-#include "common/Mutex.h" +-#include "common/WorkQueue.h" +-#include "include/rados/librados.hpp" +-#include "include/rbd/librbd.h" +-#include "include/assert.h" +-#include "librbd/ImageCtx.h" +-#include "librbd/ImageState.h" +- +-#include "CacheControllerSocket.hpp" +-#include "ObjectCacheStore.h" +- +- +-using boost::asio::local::stream_protocol; +- +-namespace rbd { +-namespace cache { +- +-class CacheController { +- public: +- CacheController(CephContext *cct, const std::vector<const char*> &args); +- ~CacheController(); +- +- int init(); +- +- int shutdown(); +- +- void handle_signal(int sinnum); +- +- void run(); +- +- void handle_request(uint64_t sesstion_id, std::string msg); +- +- private: +- CacheServer *m_cache_server; +- std::vector<const char*> m_args; +- CephContext *m_cct; +- ObjectCacheStore *m_object_cache_store; +- ContextWQ* pcache_op_work_queue; +-}; +- +-} // namespace rbd +-} // namespace cache +- +-#endif +diff --git a/src/tools/rbd_cache/CacheControllerSocket.hpp b/src/tools/rbd_cache/CacheControllerSocket.hpp +deleted file mode 100644 +index 2ff7477..0000000 +--- a/src/tools/rbd_cache/CacheControllerSocket.hpp ++++ /dev/null +@@ -1,228 +0,0 @@ +-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +-// vim: ts=8 sw=2 smarttab +- +-#ifndef CACHE_CONTROLLER_SOCKET_H +-#define CACHE_CONTROLLER_SOCKET_H +- +-#include <cstdio> +-#include <iostream> +-#include <array> +-#include <memory> +-#include <string> +-#include <boost/bind.hpp> +-#include <boost/asio.hpp> +-#include <boost/asio/error.hpp> +-#include <boost/algorithm/string.hpp> +-#include "CacheControllerSocketCommon.h" +- +- +-using boost::asio::local::stream_protocol; +- +-namespace rbd { +-namespace cache { +- +-class session : public std::enable_shared_from_this<session> { +-public: +- session(uint64_t session_id, boost::asio::io_service& io_service, ProcessMsg processmsg) +- : m_session_id(session_id), m_dm_socket(io_service), process_msg(processmsg) {} +- +- stream_protocol::socket& socket() { +- return m_dm_socket; +- } +- +- void start() { +- if(true) { +- serial_handing_request(); +- } else { +- parallel_handing_request(); +- } +- } +- // flow: +- // +- // recv request --> process request --> reply ack +- // | | +- // --------------<------------------------- +- void serial_handing_request() { +- boost::asio::async_read(m_dm_socket, boost::asio::buffer(m_buffer, RBDSC_MSG_LEN), +- boost::asio::transfer_exactly(RBDSC_MSG_LEN), +- boost::bind(&session::handle_read, +- shared_from_this(), +- boost::asio::placeholders::error, +- boost::asio::placeholders::bytes_transferred)); +- } +- +- // flow : +- // +- // --> thread 1: process request +- // recv request --> thread 2: process request --> reply ack +- // --> thread n: process request +- // +- void parallel_handing_request() { +- // TODO +- } +- +-private: +- +- void handle_read(const boost::system::error_code& error, size_t bytes_transferred) { +- // when recv eof, the most proble is that client side close socket. +- // so, server side need to end handing_request +- if(error == boost::asio::error::eof) { +- std::cout<<"session: async_read : " << error.message() << std::endl; +- return; +- } +- +- if(error) { +- std::cout<<"session: async_read fails: " << error.message() << std::endl; +- assert(0); +- } +- +- if(bytes_transferred != RBDSC_MSG_LEN) { +- std::cout<<"session : request in-complete. "<<std::endl; +- assert(0); +- } +- +- // TODO async_process can increse coding readable. +- // process_msg_callback call handle async_send +- process_msg(m_session_id, std::string(m_buffer, bytes_transferred)); +- } +- +- void handle_write(const boost::system::error_code& error, size_t bytes_transferred) { +- if (error) { +- std::cout<<"session: async_write fails: " << error.message() << std::endl; +- assert(0); +- } +- +- if(bytes_transferred != RBDSC_MSG_LEN) { +- std::cout<<"session : reply in-complete. "<<std::endl; +- assert(0); +- } +- +- boost::asio::async_read(m_dm_socket, boost::asio::buffer(m_buffer), +- boost::asio::transfer_exactly(RBDSC_MSG_LEN), +- boost::bind(&session::handle_read, +- shared_from_this(), +- boost::asio::placeholders::error, +- boost::asio::placeholders::bytes_transferred)); +- +- } +- +-public: +- void send(std::string msg) { +- boost::asio::async_write(m_dm_socket, +- boost::asio::buffer(msg.c_str(), msg.size()), +- boost::asio::transfer_exactly(RBDSC_MSG_LEN), +- boost::bind(&session::handle_write, +- shared_from_this(), +- boost::asio::placeholders::error, +- boost::asio::placeholders::bytes_transferred)); +- +- } +- +-private: +- uint64_t m_session_id; +- stream_protocol::socket m_dm_socket; +- ProcessMsg process_msg; +- +- // Buffer used to store data received from the client. +- //std::array<char, 1024> data_; +- char m_buffer[1024]; +-}; +- +-typedef std::shared_ptr<session> session_ptr; +- +-class CacheServer { +-public: +- CacheServer(const std::string& file, ProcessMsg processmsg, CephContext* cct) +- : m_cct(cct), m_server_process_msg(processmsg), +- m_local_path(file), +- m_acceptor(m_io_service) +- {} +- +- void run() { +- bool ret; +- ret = start_accept(); +- if(!ret) { +- return; +- } +- m_io_service.run(); +- } +- +- // TODO : use callback to replace this function. +- void send(uint64_t session_id, std::string msg) { +- auto it = m_session_map.find(session_id); +- if (it != m_session_map.end()) { +- it->second->send(msg); +- } else { +- // TODO : why don't find existing session id ? +- std::cout<<"don't find session id..."<<std::endl; +- assert(0); +- } +- } +- +-private: +- // when creating one acceptor, can control every step in this way. +- bool start_accept() { +- boost::system::error_code ec; +- m_acceptor.open(m_local_path.protocol(), ec); +- if(ec) { +- std::cout << "m_acceptor open fails: " << ec.message() << std::endl; +- return false; +- } +- +- // TODO control acceptor attribute. +- +- m_acceptor.bind(m_local_path, ec); +- if(ec) { +- std::cout << "m_acceptor bind fails: " << ec.message() << std::endl; +- return false; +- } +- +- m_acceptor.listen(boost::asio::socket_base::max_connections, ec); +- if(ec) { +- std::cout << "m_acceptor listen fails: " << ec.message() << std::endl; +- return false; +- } +- +- accept(); +- return true; +- } +- +- void accept() { +- session_ptr new_session(new session(m_session_id, m_io_service, m_server_process_msg)); +- m_acceptor.async_accept(new_session->socket(), +- boost::bind(&CacheServer::handle_accept, this, new_session, +- boost::asio::placeholders::error)); +- } +- +- void handle_accept(session_ptr new_session, const boost::system::error_code& error) { +- //TODO(): open librbd snap ... yuan +- +- if(error) { +- std::cout << "async accept fails : " << error.message() << std::endl; +- assert(0); // TODO +- } +- +- // must put session into m_session_map at the front of session.start() +- m_session_map.emplace(m_session_id, new_session); +- // TODO : session setting +- new_session->start(); +- m_session_id++; +- +- // lanuch next accept +- accept(); +- } +- +-private: +- CephContext* m_cct; +- boost::asio::io_service m_io_service; // TODO wrapper it. +- ProcessMsg m_server_process_msg; +- stream_protocol::endpoint m_local_path; +- stream_protocol::acceptor m_acceptor; +- uint64_t m_session_id = 1; +- std::map<uint64_t, session_ptr> m_session_map; +-}; +- +-} // namespace cache +-} // namespace rbd +- +-#endif +diff --git a/src/tools/rbd_cache/CacheControllerSocketClient.hpp b/src/tools/rbd_cache/CacheControllerSocketClient.hpp +deleted file mode 100644 +index 964f888..0000000 +--- a/src/tools/rbd_cache/CacheControllerSocketClient.hpp ++++ /dev/null +@@ -1,229 +0,0 @@ +-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +-// vim: ts=8 sw=2 smarttab +- +-#ifndef CACHE_CONTROLLER_SOCKET_CLIENT_H +-#define CACHE_CONTROLLER_SOCKET_CLIENT_H +- +-#include <atomic> +-#include <boost/asio.hpp> +-#include <boost/bind.hpp> +-#include <boost/asio/error.hpp> +-#include <boost/algorithm/string.hpp> +-#include "librbd/ImageCtx.h" +-#include "include/assert.h" +-#include "include/Context.h" +-#include "CacheControllerSocketCommon.h" +- +- +-using boost::asio::local::stream_protocol; +- +-namespace rbd { +-namespace cache { +- +-class CacheClient { +-public: +- CacheClient(const std::string& file, ClientProcessMsg processmsg, CephContext* ceph_ctx) +- : m_io_service_work(m_io_service), +- m_dm_socket(m_io_service), +- m_client_process_msg(processmsg), +- m_ep(stream_protocol::endpoint(file)), +- m_session_work(false), +- cct(ceph_ctx) +- { +- // TODO wrapper io_service +- std::thread thd([this](){ +- m_io_service.run();}); +- thd.detach(); +- } +- +- void run(){ +- } +- +- bool is_session_work() { +- return m_session_work.load() == true; +- } +- +- // just when error occur, call this method. +- void close() { +- m_session_work.store(false); +- boost::system::error_code close_ec; +- m_dm_socket.close(close_ec); +- if(close_ec) { +- std::cout << "close: " << close_ec.message() << std::endl; +- } +- std::cout << "session don't work, later all request will be dispatched to rados layer" << std::endl; +- } +- +- int connect() { +- boost::system::error_code ec; +- m_dm_socket.connect(m_ep, ec); +- if(ec) { +- if(ec == boost::asio::error::connection_refused) { +- std::cout << ec.message() << " : maybe rbd-cache Controller don't startup. " +- << "Now data will be read from ceph cluster " << std::endl; +- } else { +- std::cout << "connect: " << ec.message() << std::endl; +- } +- +- if(m_dm_socket.is_open()) { +- // Set to indicate what error occurred, if any. +- // Note that, even if the function indicates an error, +- // the underlying descriptor is closed. +- boost::system::error_code close_ec; +- m_dm_socket.close(close_ec); +- if(close_ec) { +- std::cout << "close: " << close_ec.message() << std::endl; +- } +- } +- return -1; +- } +- +- std::cout<<"connect success"<<std::endl; +- +- return 0; +- } +- +- int register_volume(std::string pool_name, std::string vol_name, uint64_t vol_size) { +- // cache controller will init layout +- rbdsc_req_type_t *message = new rbdsc_req_type_t(); +- message->type = RBDSC_REGISTER; +- memcpy(message->pool_name, pool_name.c_str(), pool_name.size()); +- memcpy(message->vol_name, vol_name.c_str(), vol_name.size()); +- message->vol_size = vol_size; +- message->offset = 0; +- message->length = 0; +- +- uint64_t ret; +- boost::system::error_code ec; +- +- ret = boost::asio::write(m_dm_socket, boost::asio::buffer((char*)message, message->size()), ec); +- if(ec) { +- std::cout << "write fails : " << ec.message() << std::endl; +- return -1; +- } +- +- if(ret != message->size()) { +- std::cout << "write fails : ret != send_bytes "<< std::endl; +- return -1; +- } +- +- // hard code TODO +- ret = boost::asio::read(m_dm_socket, boost::asio::buffer(m_recv_buffer, RBDSC_MSG_LEN), ec); +- if(ec == boost::asio::error::eof) { +- std::cout<< "recv eof"<<std::endl; +- return -1; +- } +- +- if(ec) { +- std::cout << "write fails : " << ec.message() << std::endl; +- return -1; +- } +- +- if(ret != RBDSC_MSG_LEN) { +- std::cout << "write fails : ret != receive bytes " << std::endl; +- return -1; +- } +- +- m_client_process_msg(std::string(m_recv_buffer, ret)); +- +- delete message; +- +- std::cout << "register volume success" << std::endl; +- +- // TODO +- m_session_work.store(true); +- +- return 0; +- } +- +- // if occur any error, we just return false. Then read from rados. +- int lookup_object(std::string pool_name, std::string vol_name, std::string object_id, Context* on_finish) { +- rbdsc_req_type_t *message = new rbdsc_req_type_t(); +- message->type = RBDSC_READ; +- memcpy(message->pool_name, pool_name.c_str(), pool_name.size()); +- memcpy(message->vol_name, object_id.c_str(), object_id.size()); +- message->vol_size = 0; +- message->offset = 0; +- message->length = 0; +- +- boost::asio::async_write(m_dm_socket, +- boost::asio::buffer((char*)message, message->size()), +- boost::asio::transfer_exactly(RBDSC_MSG_LEN), +- [this, on_finish, message](const boost::system::error_code& err, size_t cb) { +- delete message; +- if(err) { +- std::cout<< "lookup_object: async_write fails." << err.message() << std::endl; +- close(); +- on_finish->complete(false); +- return; +- } +- if(cb != RBDSC_MSG_LEN) { +- std::cout<< "lookup_object: async_write fails. in-complete request" <<std::endl; +- close(); +- on_finish->complete(false); +- return; +- } +- get_result(on_finish); +- }); +- +- return 0; +- } +- +- void get_result(Context* on_finish) { +- boost::asio::async_read(m_dm_socket, boost::asio::buffer(m_recv_buffer, RBDSC_MSG_LEN), +- boost::asio::transfer_exactly(RBDSC_MSG_LEN), +- [this, on_finish](const boost::system::error_code& err, size_t cb) { +- if(err == boost::asio::error::eof) { +- std::cout<<"get_result: ack is EOF." << std::endl; +- close(); +- on_finish->complete(false); +- return; +- } +- if(err) { +- std::cout<< "get_result: async_read fails:" << err.message() << std::endl; +- close(); +- on_finish->complete(false); // TODO replace this assert with some metohds. +- return; +- } +- if (cb != RBDSC_MSG_LEN) { +- close(); +- std::cout << "get_result: in-complete ack." << std::endl; +- on_finish->complete(false); // TODO: replace this assert with some methods. +- } +- +- rbdsc_req_type_t *io_ctx = (rbdsc_req_type_t*)(m_recv_buffer); +- +- // TODO: re-occur yuan's bug +- if(io_ctx->type == RBDSC_READ) { +- std::cout << "get rbdsc_read... " << std::endl; +- assert(0); +- } +- +- if (io_ctx->type == RBDSC_READ_REPLY) { +- on_finish->complete(true); +- return; +- } else { +- on_finish->complete(false); +- return; +- } +- }); +- } +- +-private: +- boost::asio::io_service m_io_service; +- boost::asio::io_service::work m_io_service_work; +- stream_protocol::socket m_dm_socket; +- ClientProcessMsg m_client_process_msg; +- stream_protocol::endpoint m_ep; +- char m_recv_buffer[1024]; +- +- // atomic modfiy for this variable. +- // thread 1 : asio callback thread modify it. +- // thread 2 : librbd read it. +- std::atomic<bool> m_session_work; +- CephContext* cct; +-}; +- +-} // namespace cache +-} // namespace rbd +-#endif +diff --git a/src/tools/rbd_cache/CacheControllerSocketCommon.h b/src/tools/rbd_cache/CacheControllerSocketCommon.h +deleted file mode 100644 +index e17529a..0000000 +--- a/src/tools/rbd_cache/CacheControllerSocketCommon.h ++++ /dev/null +@@ -1,62 +0,0 @@ +-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +-// vim: ts=8 sw=2 smarttab +- +-#ifndef CACHE_CONTROLLER_SOCKET_COMMON_H +-#define CACHE_CONTROLLER_SOCKET_COMMON_H +- +-/* +-#define RBDSC_REGISTER 0X11 +-#define RBDSC_READ 0X12 +-#define RBDSC_LOOKUP 0X13 +-#define RBDSC_REGISTER_REPLY 0X14 +-#define RBDSC_READ_REPLY 0X15 +-#define RBDSC_LOOKUP_REPLY 0X16 +-#define RBDSC_READ_RADOS 0X17 +-*/ +- +-namespace rbd { +-namespace cache { +- +-static const int RBDSC_REGISTER = 0X11; +-static const int RBDSC_READ = 0X12; +-static const int RBDSC_LOOKUP = 0X13; +-static const int RBDSC_REGISTER_REPLY = 0X14; +-static const int RBDSC_READ_REPLY = 0X15; +-static const int RBDSC_LOOKUP_REPLY = 0X16; +-static const int RBDSC_READ_RADOS = 0X17; +- +- +- +-typedef std::function<void(uint64_t, std::string)> ProcessMsg; +-typedef std::function<void(std::string)> ClientProcessMsg; +-typedef uint8_t rbdsc_req_type; +-struct rbdsc_req_type_t { +- rbdsc_req_type type; +- uint64_t vol_size; +- uint64_t offset; +- uint64_t length; +- char pool_name[256]; +- char vol_name[256]; +- +- uint64_t size() { +- return sizeof(rbdsc_req_type_t); +- } +- +- std::string to_buffer() { +- std::stringstream ss; +- ss << type; +- ss << vol_size; +- ss << offset; +- ss << length; +- ss << pool_name; +- ss << vol_name; +- +- return ss.str(); +- } +-}; +- +-static const int RBDSC_MSG_LEN = sizeof(rbdsc_req_type_t); +- +-} // namespace cache +-} // namespace rbd +-#endif +diff --git a/src/tools/rbd_cache/ObjectCacheStore.cc b/src/tools/rbd_cache/ObjectCacheStore.cc +deleted file mode 100644 +index 99f90d6..0000000 +--- a/src/tools/rbd_cache/ObjectCacheStore.cc ++++ /dev/null +@@ -1,172 +0,0 @@ +-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +-// vim: ts=8 sw=2 smarttab +- +-#include "ObjectCacheStore.h" +- +-#define dout_context g_ceph_context +-#define dout_subsys ceph_subsys_rbd_cache +-#undef dout_prefix +-#define dout_prefix *_dout << "rbd::cache::ObjectCacheStore: " << this << " " \ +- << __func__ << ": " +- +-namespace rbd { +-namespace cache { +- +-ObjectCacheStore::ObjectCacheStore(CephContext *cct, ContextWQ* work_queue) +- : m_cct(cct), m_work_queue(work_queue), +- m_rados(new librados::Rados()) { +- +- uint64_t object_cache_entries = +- cct->_conf.get_val<int64_t>("rbd_shared_cache_entries"); +- +- //TODO(): allow to set level +- m_policy = new SimplePolicy(object_cache_entries, 0.5); +-} +- +-ObjectCacheStore::~ObjectCacheStore() { +- delete m_policy; +-} +- +-int ObjectCacheStore::init(bool reset) { +- +- int ret = m_rados->init_with_context(m_cct); +- if(ret < 0) { +- lderr(m_cct) << "fail to init Ceph context" << dendl; +- return ret; +- } +- +- ret = m_rados->connect(); +- if(ret < 0 ) { +- lderr(m_cct) << "fail to conect to cluster" << dendl; +- return ret; +- } +- +- std::string cache_path = m_cct->_conf.get_val<std::string>("rbd_shared_cache_path"); +- //TODO(): check and reuse existing cache objects +- if(reset) { +- std::string cmd = "exec rm -rf " + cache_path + "/rbd_cache*; exec mkdir -p " + cache_path; +- //TODO(): to use std::filesystem +- int r = system(cmd.c_str()); +- } +- +- evict_thd = new std::thread([this]{this->evict_thread_body();}); +- return ret; +-} +- +-int ObjectCacheStore::do_promote(std::string pool_name, std::string object_name) { +- int ret = 0; +- std::string cache_file_name = pool_name + object_name; +- +- //TODO(): lock on ioctx map +- if (m_ioctxs.find(pool_name) == m_ioctxs.end()) { +- librados::IoCtx* io_ctx = new librados::IoCtx(); +- ret = m_rados->ioctx_create(pool_name.c_str(), *io_ctx); +- if (ret < 0) { +- lderr(m_cct) << "fail to create ioctx" << dendl; +- assert(0); +- } +- m_ioctxs.emplace(pool_name, io_ctx); +- } +- +- assert(m_ioctxs.find(pool_name) != m_ioctxs.end()); +- +- librados::IoCtx* ioctx = m_ioctxs[pool_name]; +- +- librados::bufferlist* read_buf = new librados::bufferlist(); +- int object_size = 4096*1024; //TODO(): read config from image metadata +- +- //TODO(): async promote +- ret = promote_object(ioctx, object_name, read_buf, object_size); +- if (ret == -ENOENT) { +- read_buf->append(std::string(object_size, '0')); +- ret = 0; +- } +- +- if( ret < 0) { +- lderr(m_cct) << "fail to read from rados" << dendl; +- return ret; +- } +- +- // persistent to cache +- librbd::cache::SyncFile cache_file(m_cct, cache_file_name); +- cache_file.open(); +- ret = cache_file.write_object_to_file(*read_buf, object_size); +- +- // update metadata +- assert(OBJ_CACHE_PROMOTING == m_policy->get_status(cache_file_name)); +- m_policy->update_status(cache_file_name, OBJ_CACHE_PROMOTED); +- assert(OBJ_CACHE_PROMOTED == m_policy->get_status(cache_file_name)); +- +- return ret; +- +-} +- +-// return -1, client need to read data from cluster. +-// return 0, client directly read data from cache. +-int ObjectCacheStore::lookup_object(std::string pool_name, std::string object_name) { +- +- std::string cache_file_name = pool_name + object_name; +- +- CACHESTATUS ret; +- ret = m_policy->lookup_object(cache_file_name); +- +- switch(ret) { +- case OBJ_CACHE_NONE: +- return do_promote(pool_name, object_name); +- case OBJ_CACHE_PROMOTED: +- return 0; +- case OBJ_CACHE_PROMOTING: +- default: +- return -1; +- } +-} +- +-void ObjectCacheStore::evict_thread_body() { +- int ret; +- while(m_evict_go) { +- ret = evict_objects(); +- } +-} +- +- +-int ObjectCacheStore::shutdown() { +- m_evict_go = false; +- evict_thd->join(); +- m_rados->shutdown(); +- return 0; +-} +- +-int ObjectCacheStore::init_cache(std::string vol_name, uint64_t vol_size) { +- return 0; +-} +- +-int ObjectCacheStore::lock_cache(std::string vol_name) { +- return 0; +-} +- +-int ObjectCacheStore::promote_object(librados::IoCtx* ioctx, std::string object_name, librados::bufferlist* read_buf, uint64_t read_len) { +- int ret; +- +- librados::AioCompletion* read_completion = librados::Rados::aio_create_completion(); +- +- ret = ioctx->aio_read(object_name, read_completion, read_buf, read_len, 0); +- if(ret < 0) { +- lderr(m_cct) << "fail to read from rados" << dendl; +- return ret; +- } +- read_completion->wait_for_complete(); +- ret = read_completion->get_return_value(); +- return ret; +- +-} +- +-int ObjectCacheStore::evict_objects() { +- std::list<std::string> obj_list; +- m_policy->get_evict_list(&obj_list); +- for (auto& obj: obj_list) { +- //do_evict(obj); +- } +-} +- +-} // namespace cache +-} // namespace rbd +diff --git a/src/tools/rbd_cache/ObjectCacheStore.h b/src/tools/rbd_cache/ObjectCacheStore.h +deleted file mode 100644 +index ba0e1f1..0000000 +--- a/src/tools/rbd_cache/ObjectCacheStore.h ++++ /dev/null +@@ -1,70 +0,0 @@ +-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +-// vim: ts=8 sw=2 smarttab +- +-#ifndef OBJECT_CACHE_STORE_H +-#define OBJECT_CACHE_STORE_H +- +-#include "common/debug.h" +-#include "common/errno.h" +-#include "common/ceph_context.h" +-#include "common/Mutex.h" +-#include "include/rados/librados.hpp" +-#include "include/rbd/librbd.h" +-#include "librbd/ImageCtx.h" +-#include "librbd/ImageState.h" +-#include "librbd/cache/SharedPersistentObjectCacherFile.h" +-#include "SimplePolicy.hpp" +- +- +-using librados::Rados; +-using librados::IoCtx; +- +-namespace rbd { +-namespace cache { +- +-typedef shared_ptr<librados::Rados> RadosRef; +-typedef shared_ptr<librados::IoCtx> IoCtxRef; +- +-class ObjectCacheStore +-{ +- public: +- ObjectCacheStore(CephContext *cct, ContextWQ* work_queue); +- ~ObjectCacheStore(); +- +- int init(bool reset); +- +- int shutdown(); +- +- int lookup_object(std::string pool_name, std::string object_name); +- +- int init_cache(std::string vol_name, uint64_t vol_size); +- +- int lock_cache(std::string vol_name); +- +- private: +- void evict_thread_body(); +- int evict_objects(); +- +- int do_promote(std::string pool_name, std::string object_name); +- +- int promote_object(librados::IoCtx*, std::string object_name, +- librados::bufferlist* read_buf, +- uint64_t length); +- +- CephContext *m_cct; +- ContextWQ* m_work_queue; +- RadosRef m_rados; +- +- +- std::map<std::string, librados::IoCtx*> m_ioctxs; +- +- librbd::cache::SyncFile *m_cache_file; +- +- Policy* m_policy; +- std::thread* evict_thd; +- bool m_evict_go = false; +-}; +- +-} // namespace rbd +-} // namespace cache +-#endif +diff --git a/src/tools/rbd_cache/Policy.hpp b/src/tools/rbd_cache/Policy.hpp +deleted file mode 100644 +index 711e3bd..0000000 +--- a/src/tools/rbd_cache/Policy.hpp ++++ /dev/null +@@ -1,30 +0,0 @@ +-#ifndef RBD_CACHE_POLICY_HPP +-#define RBD_CACHE_POLICY_HPP +- +-#include <list> +-#include <string> +- +-namespace rbd { +-namespace cache { +- +-enum CACHESTATUS { +- OBJ_CACHE_NONE = 0, +- OBJ_CACHE_PROMOTING, +- OBJ_CACHE_PROMOTED, +-}; +- +- +-class Policy { +-public: +- Policy(){} +- virtual ~Policy(){}; +- virtual CACHESTATUS lookup_object(std::string) = 0; +- virtual int evict_object(std::string&) = 0; +- virtual void update_status(std::string, CACHESTATUS) = 0; +- virtual CACHESTATUS get_status(std::string) = 0; +- virtual void get_evict_list(std::list<std::string>* obj_list) = 0; +-}; +- +-} // namespace cache +-} // namespace rbd +-#endif +diff --git a/src/tools/rbd_cache/SimplePolicy.hpp b/src/tools/rbd_cache/SimplePolicy.hpp +deleted file mode 100644 +index e785de1..0000000 +--- a/src/tools/rbd_cache/SimplePolicy.hpp ++++ /dev/null +@@ -1,160 +0,0 @@ +-#ifndef RBD_CACHE_SIMPLE_POLICY_HPP +-#define RBD_CACHE_SIMPLE_POLICY_HPP +- +-#include "Policy.hpp" +-#include "include/lru.h" +-#include "common/RWLock.h" +-#include "common/Mutex.h" +- +-#include <vector> +-#include <unordered_map> +-#include <string> +- +-namespace rbd { +-namespace cache { +- +- +-class SimplePolicy : public Policy { +-public: +- SimplePolicy(uint64_t block_num, float watermark) +- : m_watermark(watermark), m_entry_count(block_num), +- m_cache_map_lock("rbd::cache::SimplePolicy::m_cache_map_lock"), +- m_free_list_lock("rbd::cache::SimplePolicy::m_free_list_lock") +- { +- +- for(uint64_t i = 0; i < m_entry_count; i++) { +- m_free_list.push_back(new Entry()); +- } +- +- } +- +- ~SimplePolicy() { +- for(uint64_t i = 0; i < m_entry_count; i++) { +- Entry* entry = reinterpret_cast<Entry*>(m_free_list.front()); +- delete entry; +- m_free_list.pop_front(); +- } +- } +- +- CACHESTATUS lookup_object(std::string cache_file_name) { +- +- //TODO(): check race condition +- RWLock::WLocker wlocker(m_cache_map_lock); +- +- auto entry_it = m_cache_map.find(cache_file_name); +- if(entry_it == m_cache_map.end()) { +- Mutex::Locker locker(m_free_list_lock); +- Entry* entry = reinterpret_cast<Entry*>(m_free_list.front()); +- assert(entry != nullptr); +- m_free_list.pop_front(); +- entry->status = OBJ_CACHE_PROMOTING; +- +- m_cache_map[cache_file_name] = entry; +- +- return OBJ_CACHE_NONE; +- } +- +- Entry* entry = entry_it->second; +- +- if(entry->status == OBJ_CACHE_PROMOTED) { +- // touch it +- m_promoted_lru.lru_touch(entry); +- } +- +- return entry->status; +- } +- +- int evict_object(std::string& out_cache_file_name) { +- RWLock::WLocker locker(m_cache_map_lock); +- +- return 1; +- } +- +- // TODO(): simplify the logic +- void update_status(std::string file_name, CACHESTATUS new_status) { +- RWLock::WLocker locker(m_cache_map_lock); +- +- Entry* entry; +- auto entry_it = m_cache_map.find(file_name); +- +- // just check. +- if(new_status == OBJ_CACHE_PROMOTING) { +- assert(entry_it == m_cache_map.end()); +- } +- +- assert(entry_it != m_cache_map.end()); +- +- entry = entry_it->second; +- +- // promoting is done, so update it. +- if(entry->status == OBJ_CACHE_PROMOTING && new_status== OBJ_CACHE_PROMOTED) { +- m_promoted_lru.lru_insert_top(entry); +- entry->status = new_status; +- return; +- } +- +- assert(0); +- } +- +- // get entry status +- CACHESTATUS get_status(std::string file_name) { +- RWLock::RLocker locker(m_cache_map_lock); +- auto entry_it = m_cache_map.find(file_name); +- if(entry_it == m_cache_map.end()) { +- return OBJ_CACHE_NONE; +- } +- +- return entry_it->second->status; +- } +- +- void get_evict_list(std::list<std::string>* obj_list) { +- RWLock::WLocker locker(m_cache_map_lock); +- // check free ratio, pop entries from LRU +- if (m_free_list.size() / m_entry_count < m_watermark) { +- int evict_num = 10; //TODO(): make this configurable +- for(int i = 0; i < evict_num; i++) { +- Entry* entry = reinterpret_cast<Entry*>(m_promoted_lru.lru_expire()); +- if (entry == nullptr) { +- continue; +- } +- std::string file_name = entry->cache_file_name; +- obj_list->push_back(file_name); +- +- auto entry_it = m_cache_map.find(file_name); +- m_cache_map.erase(entry_it); +- +- //mark this entry as free +- entry->status = OBJ_CACHE_NONE; +- Mutex::Locker locker(m_free_list_lock); +- m_free_list.push_back(entry); +- } +- } +- } +- +-private: +- +- class Entry : public LRUObject { +- public: +- CACHESTATUS status; +- Entry() : status(OBJ_CACHE_NONE){} +- std::string cache_file_name; +- void encode(bufferlist &bl){} +- void decode(bufferlist::iterator &it){} +- }; +- +- float m_watermark; +- uint64_t m_entry_count; +- +- std::unordered_map<std::string, Entry*> m_cache_map; +- RWLock m_cache_map_lock; +- +- std::deque<Entry*> m_free_list; +- Mutex m_free_list_lock; +- +- LRU m_promoted_lru; // include promoted, using status. +- +-}; +- +-} // namespace cache +-} // namespace rbd +-#endif +diff --git a/src/tools/rbd_cache/main.cc b/src/tools/rbd_cache/main.cc +deleted file mode 100644 +index d604760..0000000 +--- a/src/tools/rbd_cache/main.cc ++++ /dev/null +@@ -1,85 +0,0 @@ +-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +-// vim: ts=8 sw=2 smarttab +- +-#include "common/ceph_argparse.h" +-#include "common/config.h" +-#include "common/debug.h" +-#include "common/errno.h" +-#include "global/global_init.h" +-#include "global/signal_handler.h" +-#include "CacheController.h" +- +-#include <vector> +- +-rbd::cache::CacheController *cachectl = nullptr; +- +-void usage() { +- std::cout << "usage: cache controller [options...]" << std::endl; +- std::cout << "options:\n"; +- std::cout << " -m monaddress[:port] connect to specified monitor\n"; +- std::cout << " --keyring=<path> path to keyring for local cluster\n"; +- std::cout << " --log-file=<logfile> file to log debug output\n"; +- std::cout << " --debug-rbd-cachecontroller=<log-level>/<memory-level> set rbd-mirror debug level\n"; +- generic_server_usage(); +-} +- +-static void handle_signal(int signum) +-{ +- if (cachectl) +- cachectl->handle_signal(signum); +-} +- +-int main(int argc, const char **argv) +-{ +- std::vector<const char*> args; +- env_to_vec(args); +- argv_to_vec(argc, argv, args); +- +- auto cct = global_init(nullptr, args, CEPH_ENTITY_TYPE_CLIENT, +- CODE_ENVIRONMENT_DAEMON, +- CINIT_FLAG_UNPRIVILEGED_DAEMON_DEFAULTS); +- +- for (auto i = args.begin(); i != args.end(); ++i) { +- if (ceph_argparse_flag(args, i, "-h", "--help", (char*)NULL)) { +- usage(); +- return EXIT_SUCCESS; +- } +- } +- +- if (g_conf()->daemonize) { +- global_init_daemonize(g_ceph_context); +- } +- g_ceph_context->enable_perf_counter(); +- +- common_init_finish(g_ceph_context); +- +- init_async_signal_handler(); +- register_async_signal_handler(SIGHUP, sighup_handler); +- register_async_signal_handler_oneshot(SIGINT, handle_signal); +- register_async_signal_handler_oneshot(SIGTERM, handle_signal); +- +- std::vector<const char*> cmd_args; +- argv_to_vec(argc, argv, cmd_args); +- +- // disable unnecessary librbd cache +- g_ceph_context->_conf.set_val_or_die("rbd_cache", "false"); +- +- cachectl = new rbd::cache::CacheController(g_ceph_context, cmd_args); +- int r = cachectl->init(); +- if (r < 0) { +- std::cerr << "failed to initialize: " << cpp_strerror(r) << std::endl; +- goto cleanup; +- } +- +- cachectl->run(); +- +- cleanup: +- unregister_async_signal_handler(SIGHUP, sighup_handler); +- unregister_async_signal_handler(SIGINT, handle_signal); +- unregister_async_signal_handler(SIGTERM, handle_signal); +- shutdown_async_signal_handler(); +- +- delete cachectl; +- +- return r < 0 ? EXIT_SUCCESS : EXIT_FAILURE; +-} +-- +2.7.4 + |