diff options
Diffstat (limited to 'src/ceph/0010-librbd-new-namespace-ceph-immutable-obj-cache.patch')
-rw-r--r-- | src/ceph/0010-librbd-new-namespace-ceph-immutable-obj-cache.patch | 3510 |
1 files changed, 0 insertions, 3510 deletions
diff --git a/src/ceph/0010-librbd-new-namespace-ceph-immutable-obj-cache.patch b/src/ceph/0010-librbd-new-namespace-ceph-immutable-obj-cache.patch deleted file mode 100644 index 46040e6..0000000 --- a/src/ceph/0010-librbd-new-namespace-ceph-immutable-obj-cache.patch +++ /dev/null @@ -1,3510 +0,0 @@ -From 9a097084d06e7186206b43d9c81d1f648791d7a4 Mon Sep 17 00:00:00 2001 -From: Yuan Zhou <yuan.zhou@intel.com> -Date: Fri, 7 Sep 2018 08:29:51 +0800 -Subject: [PATCH 10/10] librbd: new namespace ceph immutable obj cache - -clean up class/func names to use the new namespace - -Signed-off-by: Yuan Zhou <yuan.zhou@intel.com> ---- - src/common/options.cc | 2 +- - src/common/subsys.h | 3 +- - src/librbd/CMakeLists.txt | 3 +- - .../SharedPersistentObjectCacherObjectDispatch.cc | 175 ---------------- - .../SharedPersistentObjectCacherObjectDispatch.h | 133 ------------ - src/librbd/cache/SharedReadOnlyObjectDispatch.cc | 170 +++++++++++++++ - src/librbd/cache/SharedReadOnlyObjectDispatch.h | 126 ++++++++++++ - src/librbd/image/OpenRequest.cc | 4 +- - src/tools/CMakeLists.txt | 2 +- - .../ceph_immutable_object_cache/CMakeLists.txt | 11 + - .../ceph_immutable_object_cache/CacheClient.cc | 205 ++++++++++++++++++ - .../ceph_immutable_object_cache/CacheClient.h | 53 +++++ - .../ceph_immutable_object_cache/CacheController.cc | 117 +++++++++++ - .../ceph_immutable_object_cache/CacheController.h | 53 +++++ - .../ceph_immutable_object_cache/CacheServer.cc | 99 +++++++++ - .../ceph_immutable_object_cache/CacheServer.h | 54 +++++ - .../ceph_immutable_object_cache/CacheSession.cc | 115 +++++++++++ - .../ceph_immutable_object_cache/CacheSession.h | 58 ++++++ - .../ObjectCacheStore.cc | 172 ++++++++++++++++ - .../ceph_immutable_object_cache/ObjectCacheStore.h | 70 +++++++ - src/tools/ceph_immutable_object_cache/Policy.hpp | 33 +++ - .../ceph_immutable_object_cache/SimplePolicy.hpp | 163 +++++++++++++++ - .../ceph_immutable_object_cache/SocketCommon.h | 54 +++++ - src/tools/ceph_immutable_object_cache/main.cc | 85 ++++++++ - src/tools/rbd_cache/CMakeLists.txt | 9 - - src/tools/rbd_cache/CacheController.cc | 116 ----------- - src/tools/rbd_cache/CacheController.h | 54 ----- - src/tools/rbd_cache/CacheControllerSocket.hpp | 228 -------------------- - .../rbd_cache/CacheControllerSocketClient.hpp | 229 --------------------- - src/tools/rbd_cache/CacheControllerSocketCommon.h | 62 ------ - src/tools/rbd_cache/ObjectCacheStore.cc | 172 ---------------- - src/tools/rbd_cache/ObjectCacheStore.h | 70 ------- - src/tools/rbd_cache/Policy.hpp | 30 --- - src/tools/rbd_cache/SimplePolicy.hpp | 160 -------------- - src/tools/rbd_cache/main.cc | 85 -------- - 35 files changed, 1646 insertions(+), 1529 deletions(-) - delete mode 100644 src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc - delete mode 100644 src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.h - create mode 100644 src/librbd/cache/SharedReadOnlyObjectDispatch.cc - create mode 100644 src/librbd/cache/SharedReadOnlyObjectDispatch.h - create mode 100644 src/tools/ceph_immutable_object_cache/CMakeLists.txt - create mode 100644 src/tools/ceph_immutable_object_cache/CacheClient.cc - create mode 100644 src/tools/ceph_immutable_object_cache/CacheClient.h - create mode 100644 src/tools/ceph_immutable_object_cache/CacheController.cc - create mode 100644 src/tools/ceph_immutable_object_cache/CacheController.h - create mode 100644 src/tools/ceph_immutable_object_cache/CacheServer.cc - create mode 100644 src/tools/ceph_immutable_object_cache/CacheServer.h - create mode 100644 src/tools/ceph_immutable_object_cache/CacheSession.cc - create mode 100644 src/tools/ceph_immutable_object_cache/CacheSession.h - create mode 100644 src/tools/ceph_immutable_object_cache/ObjectCacheStore.cc - create mode 100644 src/tools/ceph_immutable_object_cache/ObjectCacheStore.h - create mode 100644 src/tools/ceph_immutable_object_cache/Policy.hpp - create mode 100644 src/tools/ceph_immutable_object_cache/SimplePolicy.hpp - create mode 100644 src/tools/ceph_immutable_object_cache/SocketCommon.h - create mode 100644 src/tools/ceph_immutable_object_cache/main.cc - delete mode 100644 src/tools/rbd_cache/CMakeLists.txt - delete mode 100644 src/tools/rbd_cache/CacheController.cc - delete mode 100644 src/tools/rbd_cache/CacheController.h - delete mode 100644 src/tools/rbd_cache/CacheControllerSocket.hpp - delete mode 100644 src/tools/rbd_cache/CacheControllerSocketClient.hpp - delete mode 100644 src/tools/rbd_cache/CacheControllerSocketCommon.h - delete mode 100644 src/tools/rbd_cache/ObjectCacheStore.cc - delete mode 100644 src/tools/rbd_cache/ObjectCacheStore.h - delete mode 100644 src/tools/rbd_cache/Policy.hpp - delete mode 100644 src/tools/rbd_cache/SimplePolicy.hpp - delete mode 100644 src/tools/rbd_cache/main.cc - -diff --git a/src/common/options.cc b/src/common/options.cc -index 3172744..bf00aab 100644 ---- a/src/common/options.cc -+++ b/src/common/options.cc -@@ -6358,7 +6358,7 @@ static std::vector<Option> get_rbd_options() { - .set_description("time in seconds for detecting a hung thread"), - - Option("rbd_shared_cache_enabled", Option::TYPE_BOOL, Option::LEVEL_ADVANCED) -- .set_default(true) -+ .set_default(false) - .set_description("whether to enable shared ssd caching"), - - Option("rbd_shared_cache_path", Option::TYPE_STR, Option::LEVEL_ADVANCED) -diff --git a/src/common/subsys.h b/src/common/subsys.h -index bdd2d0e..5b532c1 100644 ---- a/src/common/subsys.h -+++ b/src/common/subsys.h -@@ -36,9 +36,10 @@ SUBSYS(objecter, 0, 1) - SUBSYS(rados, 0, 5) - SUBSYS(rbd, 0, 5) - SUBSYS(rbd_mirror, 0, 5) --SUBSYS(rbd_replay, 0, 5) - SUBSYS(journaler, 0, 5) - SUBSYS(objectcacher, 0, 5) -+SUBSYS(immutable_obj_cache, 0, 5) -+SUBSYS(rbd_replay, 0, 5) - SUBSYS(client, 0, 5) - SUBSYS(osd, 1, 5) - SUBSYS(optracker, 0, 5) -diff --git a/src/librbd/CMakeLists.txt b/src/librbd/CMakeLists.txt -index 540ee78..c9bfb6f 100644 ---- a/src/librbd/CMakeLists.txt -+++ b/src/librbd/CMakeLists.txt -@@ -32,7 +32,7 @@ set(librbd_internal_srcs - api/Snapshot.cc - cache/ImageWriteback.cc - cache/ObjectCacherObjectDispatch.cc -- cache/SharedPersistentObjectCacherObjectDispatch.cc -+ cache/SharedReadOnlyObjectDispatch.cc - cache/SharedPersistentObjectCacher.cc - cache/SharedPersistentObjectCacherFile.cc - deep_copy/ImageCopyRequest.cc -@@ -125,6 +125,7 @@ set(librbd_internal_srcs - trash/MoveRequest.cc - watcher/Notifier.cc - watcher/RewatchRequest.cc -+ ${CMAKE_SOURCE_DIR}/src/tools/ceph_immutable_object_cache/CacheClient.cc - ${CMAKE_SOURCE_DIR}/src/common/ContextCompletion.cc) - - add_library(rbd_api STATIC librbd.cc) -diff --git a/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc b/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc -deleted file mode 100644 -index 7cbc019..0000000 ---- a/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc -+++ /dev/null -@@ -1,175 +0,0 @@ --// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- --// vim: ts=8 sw=2 smarttab -- --#include "librbd/cache/SharedPersistentObjectCacherObjectDispatch.h" --#include "common/WorkQueue.h" --#include "librbd/ImageCtx.h" --#include "librbd/Journal.h" --#include "librbd/Utils.h" --#include "librbd/LibrbdWriteback.h" --#include "librbd/io/ObjectDispatchSpec.h" --#include "librbd/io/ObjectDispatcher.h" --#include "librbd/io/Utils.h" --#include "osd/osd_types.h" --#include "osdc/WritebackHandler.h" --#include <vector> -- --#define dout_subsys ceph_subsys_rbd --#undef dout_prefix --#define dout_prefix *_dout << "librbd::cache::SharedPersistentObjectCacherObjectDispatch: " \ -- << this << " " << __func__ << ": " -- --namespace librbd { --namespace cache { -- --template <typename I> --SharedPersistentObjectCacherObjectDispatch<I>::SharedPersistentObjectCacherObjectDispatch( -- I* image_ctx) : m_image_ctx(image_ctx) { --} -- --template <typename I> --SharedPersistentObjectCacherObjectDispatch<I>::~SharedPersistentObjectCacherObjectDispatch() { -- delete m_object_store; -- delete m_cache_client; --} -- --// TODO if connect fails, init will return error to high layer. --template <typename I> --void SharedPersistentObjectCacherObjectDispatch<I>::init() { -- auto cct = m_image_ctx->cct; -- ldout(cct, 5) << dendl; -- -- if (m_image_ctx->parent != nullptr) { -- //TODO(): should we cover multi-leveled clone? -- ldout(cct, 5) << "child image: skipping SRO cache client" << dendl; -- return; -- } -- -- ldout(cct, 5) << "parent image: setup SRO cache client = " << dendl; -- -- std::string controller_path = ((CephContext*)cct)->_conf.get_val<std::string>("rbd_shared_cache_sock"); -- m_cache_client = new rbd::cache::CacheClient(controller_path.c_str(), -- ([&](std::string s){client_handle_request(s);}), m_image_ctx->cct); -- -- int ret = m_cache_client->connect(); -- if (ret < 0) { -- ldout(cct, 5) << "SRO cache client fail to connect with local controller: " -- << "please start rbd-cache daemon" -- << dendl; -- } else { -- ldout(cct, 5) << "SRO cache client to register volume on rbd-cache daemon: " -- << "name = " << m_image_ctx->id -- << dendl; -- -- ret = m_cache_client->register_volume(m_image_ctx->data_ctx.get_pool_name(), -- m_image_ctx->id, m_image_ctx->size); -- -- if (ret >= 0) { -- // add ourself to the IO object dispatcher chain -- m_image_ctx->io_object_dispatcher->register_object_dispatch(this); -- } -- } --} -- --template <typename I> --bool SharedPersistentObjectCacherObjectDispatch<I>::read( -- const std::string &oid, uint64_t object_no, uint64_t object_off, -- uint64_t object_len, librados::snap_t snap_id, int op_flags, -- const ZTracer::Trace &parent_trace, ceph::bufferlist* read_data, -- io::ExtentMap* extent_map, int* object_dispatch_flags, -- io::DispatchResult* dispatch_result, Context** on_finish, -- Context* on_dispatched) { -- -- // IO chained in reverse order -- -- // Now, policy is : when session have any error, later all read will dispatched to rados layer. -- if(!m_cache_client->is_session_work()) { -- *dispatch_result = io::DISPATCH_RESULT_CONTINUE; -- on_dispatched->complete(0); -- return true; -- // TODO : domain socket have error, all read operation will dispatched to rados layer. -- } -- -- auto cct = m_image_ctx->cct; -- ldout(cct, 20) << "object_no=" << object_no << " " << object_off << "~" -- << object_len << dendl; -- -- -- on_dispatched = util::create_async_context_callback(*m_image_ctx, -- on_dispatched); -- auto ctx = new FunctionContext([this, oid, object_off, object_len, read_data, dispatch_result, on_dispatched](bool cache) { -- handle_read_cache(cache, oid, object_off, object_len, read_data, dispatch_result, on_dispatched); -- }); -- -- if (m_cache_client && m_cache_client->is_session_work() && m_object_store) { -- m_cache_client->lookup_object(m_image_ctx->data_ctx.get_pool_name(), -- m_image_ctx->id, oid, ctx); -- } -- return true; --} -- --template <typename I> --int SharedPersistentObjectCacherObjectDispatch<I>::handle_read_cache( -- bool cache, -- const std::string &oid, uint64_t object_off, uint64_t object_len, -- ceph::bufferlist* read_data, io::DispatchResult* dispatch_result, -- Context* on_dispatched) { -- // IO chained in reverse order -- auto cct = m_image_ctx->cct; -- ldout(cct, 20) << dendl; -- -- // try to read from parent image -- if (cache) { -- int r = m_object_store->read_object(oid, read_data, object_off, object_len, on_dispatched); -- //int r = object_len; -- if (r != 0) { -- *dispatch_result = io::DISPATCH_RESULT_COMPLETE; -- //TODO(): complete in syncfile -- on_dispatched->complete(r); -- ldout(cct, 20) << "AAAAcomplete=" << *dispatch_result <<dendl; -- return true; -- } -- } else { -- *dispatch_result = io::DISPATCH_RESULT_CONTINUE; -- on_dispatched->complete(0); -- ldout(cct, 20) << "BBB no cache" << *dispatch_result <<dendl; -- return false; -- } --} --template <typename I> --void SharedPersistentObjectCacherObjectDispatch<I>::client_handle_request(std::string msg) { -- auto cct = m_image_ctx->cct; -- ldout(cct, 20) << dendl; -- -- rbd::cache::rbdsc_req_type_t *io_ctx = (rbd::cache::rbdsc_req_type_t*)(msg.c_str()); -- -- switch (io_ctx->type) { -- case rbd::cache::RBDSC_REGISTER_REPLY: { -- // open cache handler for volume -- ldout(cct, 20) << "SRO cache client open cache handler" << dendl; -- m_object_store = new SharedPersistentObjectCacher<I>(m_image_ctx, m_image_ctx->shared_cache_path); -- -- break; -- } -- case rbd::cache::RBDSC_READ_REPLY: { -- ldout(cct, 20) << "SRO cache client start to read cache" << dendl; -- //TODO(): should call read here -- -- break; -- } -- case rbd::cache::RBDSC_READ_RADOS: { -- ldout(cct, 20) << "SRO cache client start to read rados" << dendl; -- //TODO(): should call read here -- -- break; -- } -- default: ldout(cct, 20) << "nothing" << io_ctx->type <<dendl; -- break; -- -- } --} -- --} // namespace cache --} // namespace librbd -- --template class librbd::cache::SharedPersistentObjectCacherObjectDispatch<librbd::ImageCtx>; -diff --git a/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.h b/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.h -deleted file mode 100644 -index 5685244..0000000 ---- a/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.h -+++ /dev/null -@@ -1,133 +0,0 @@ --// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- --// vim: ts=8 sw=2 smarttab -- --#ifndef CEPH_LIBRBD_CACHE_SHARED_PERSISTENT_OBJECT_CACHER_OBJECT_DISPATCH_H --#define CEPH_LIBRBD_CACHE_SHARED_PERSISTENT_OBJECT_CACHER_OBJECT_DISPATCH_H -- --#include "librbd/io/ObjectDispatchInterface.h" --#include "common/Mutex.h" --#include "osdc/ObjectCacher.h" --#include "tools/rbd_cache/CacheControllerSocketClient.hpp" --#include "SharedPersistentObjectCacher.h" -- --struct WritebackHandler; -- --namespace librbd { -- --class ImageCtx; -- --namespace cache { -- --/** -- * Facade around the OSDC object cacher to make it align with -- * the object dispatcher interface -- */ --template <typename ImageCtxT = ImageCtx> --class SharedPersistentObjectCacherObjectDispatch : public io::ObjectDispatchInterface { --public: -- static SharedPersistentObjectCacherObjectDispatch* create(ImageCtxT* image_ctx) { -- return new SharedPersistentObjectCacherObjectDispatch(image_ctx); -- } -- -- SharedPersistentObjectCacherObjectDispatch(ImageCtxT* image_ctx); -- ~SharedPersistentObjectCacherObjectDispatch() override; -- -- io::ObjectDispatchLayer get_object_dispatch_layer() const override { -- return io::OBJECT_DISPATCH_LAYER_SHARED_PERSISTENT_CACHE; -- } -- -- void init(); -- void shut_down(Context* on_finish) { -- m_image_ctx->op_work_queue->queue(on_finish, 0); -- } -- -- bool read( -- const std::string &oid, uint64_t object_no, uint64_t object_off, -- uint64_t object_len, librados::snap_t snap_id, int op_flags, -- const ZTracer::Trace &parent_trace, ceph::bufferlist* read_data, -- io::ExtentMap* extent_map, int* object_dispatch_flags, -- io::DispatchResult* dispatch_result, Context** on_finish, -- Context* on_dispatched) override; -- -- bool discard( -- const std::string &oid, uint64_t object_no, uint64_t object_off, -- uint64_t object_len, const ::SnapContext &snapc, int discard_flags, -- const ZTracer::Trace &parent_trace, int* object_dispatch_flags, -- uint64_t* journal_tid, io::DispatchResult* dispatch_result, -- Context** on_finish, Context* on_dispatched) { -- return false; -- } -- -- bool write( -- const std::string &oid, uint64_t object_no, uint64_t object_off, -- ceph::bufferlist&& data, const ::SnapContext &snapc, int op_flags, -- const ZTracer::Trace &parent_trace, int* object_dispatch_flags, -- uint64_t* journal_tid, io::DispatchResult* dispatch_result, -- Context** on_finish, Context* on_dispatched) { -- return false; -- } -- -- bool write_same( -- const std::string &oid, uint64_t object_no, uint64_t object_off, -- uint64_t object_len, io::Extents&& buffer_extents, -- ceph::bufferlist&& data, const ::SnapContext &snapc, int op_flags, -- const ZTracer::Trace &parent_trace, int* object_dispatch_flags, -- uint64_t* journal_tid, io::DispatchResult* dispatch_result, -- Context** on_finish, Context* on_dispatched) { -- return false; -- } -- -- bool compare_and_write( -- const std::string &oid, uint64_t object_no, uint64_t object_off, -- ceph::bufferlist&& cmp_data, ceph::bufferlist&& write_data, -- const ::SnapContext &snapc, int op_flags, -- const ZTracer::Trace &parent_trace, uint64_t* mismatch_offset, -- int* object_dispatch_flags, uint64_t* journal_tid, -- io::DispatchResult* dispatch_result, Context** on_finish, -- Context* on_dispatched) { -- return false; -- } -- -- bool flush( -- io::FlushSource flush_source, const ZTracer::Trace &parent_trace, -- io::DispatchResult* dispatch_result, Context** on_finish, -- Context* on_dispatched) { -- return false; -- } -- -- bool invalidate_cache(Context* on_finish) { -- return false; -- } -- -- bool reset_existence_cache(Context* on_finish) { -- return false; -- } -- -- void extent_overwritten( -- uint64_t object_no, uint64_t object_off, uint64_t object_len, -- uint64_t journal_tid, uint64_t new_journal_tid) { -- } -- -- SharedPersistentObjectCacher<ImageCtxT> *m_object_store = nullptr; -- --private: -- -- int handle_read_cache( -- bool cache, -- const std::string &oid, uint64_t object_off, -- uint64_t object_len, ceph::bufferlist* read_data, -- io::DispatchResult* dispatch_result, -- Context* on_dispatched); -- void client_handle_request(std::string msg); -- -- ImageCtxT* m_image_ctx; -- -- rbd::cache::CacheClient *m_cache_client = nullptr; --}; -- --} // namespace cache --} // namespace librbd -- --extern template class librbd::cache::SharedPersistentObjectCacherObjectDispatch<librbd::ImageCtx>; -- --#endif // CEPH_LIBRBD_CACHE_OBJECT_CACHER_OBJECT_DISPATCH_H -diff --git a/src/librbd/cache/SharedReadOnlyObjectDispatch.cc b/src/librbd/cache/SharedReadOnlyObjectDispatch.cc -new file mode 100644 -index 0000000..23c7dbe ---- /dev/null -+++ b/src/librbd/cache/SharedReadOnlyObjectDispatch.cc -@@ -0,0 +1,170 @@ -+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -+// vim: ts=8 sw=2 smarttab -+ -+#include "common/WorkQueue.h" -+#include "librbd/ImageCtx.h" -+#include "librbd/Journal.h" -+#include "librbd/Utils.h" -+#include "librbd/LibrbdWriteback.h" -+#include "librbd/io/ObjectDispatchSpec.h" -+#include "librbd/io/ObjectDispatcher.h" -+#include "librbd/io/Utils.h" -+#include "librbd/cache/SharedReadOnlyObjectDispatch.h" -+#include "osd/osd_types.h" -+#include "osdc/WritebackHandler.h" -+ -+#include <vector> -+ -+#define dout_subsys ceph_subsys_rbd -+#undef dout_prefix -+#define dout_prefix *_dout << "librbd::cache::SharedReadOnlyObjectDispatch: " \ -+ << this << " " << __func__ << ": " -+ -+namespace librbd { -+namespace cache { -+ -+template <typename I> -+SharedReadOnlyObjectDispatch<I>::SharedReadOnlyObjectDispatch( -+ I* image_ctx) : m_image_ctx(image_ctx) { -+} -+ -+template <typename I> -+SharedReadOnlyObjectDispatch<I>::~SharedReadOnlyObjectDispatch() { -+ delete m_object_store; -+ delete m_cache_client; -+} -+ -+// TODO if connect fails, init will return error to high layer. -+template <typename I> -+void SharedReadOnlyObjectDispatch<I>::init() { -+ auto cct = m_image_ctx->cct; -+ ldout(cct, 5) << dendl; -+ -+ if (m_image_ctx->parent != nullptr) { -+ //TODO(): should we cover multi-leveled clone? -+ ldout(cct, 5) << "child image: skipping SRO cache client" << dendl; -+ return; -+ } -+ -+ ldout(cct, 5) << "parent image: setup SRO cache client = " << dendl; -+ -+ std::string controller_path = ((CephContext*)cct)->_conf.get_val<std::string>("rbd_shared_cache_sock"); -+ m_cache_client = new ceph::immutable_obj_cache::CacheClient(controller_path.c_str(), -+ ([&](std::string s){client_handle_request(s);}), m_image_ctx->cct); -+ -+ int ret = m_cache_client->connect(); -+ if (ret < 0) { -+ ldout(cct, 5) << "SRO cache client fail to connect with local controller: " -+ << "please start rbd-cache daemon" -+ << dendl; -+ } else { -+ ldout(cct, 5) << "SRO cache client to register volume on rbd-cache daemon: " -+ << "name = " << m_image_ctx->id -+ << dendl; -+ -+ ret = m_cache_client->register_volume(m_image_ctx->data_ctx.get_pool_name(), -+ m_image_ctx->id, m_image_ctx->size); -+ -+ if (ret >= 0) { -+ // add ourself to the IO object dispatcher chain -+ m_image_ctx->io_object_dispatcher->register_object_dispatch(this); -+ } -+ } -+} -+ -+template <typename I> -+bool SharedReadOnlyObjectDispatch<I>::read( -+ const std::string &oid, uint64_t object_no, uint64_t object_off, -+ uint64_t object_len, librados::snap_t snap_id, int op_flags, -+ const ZTracer::Trace &parent_trace, ceph::bufferlist* read_data, -+ io::ExtentMap* extent_map, int* object_dispatch_flags, -+ io::DispatchResult* dispatch_result, Context** on_finish, -+ Context* on_dispatched) { -+ auto cct = m_image_ctx->cct; -+ ldout(cct, 20) << "object_no=" << object_no << " " << object_off << "~" -+ << object_len << dendl; -+ -+ // if any session fails, later reads will go to rados -+ if(!m_cache_client->is_session_work()) { -+ *dispatch_result = io::DISPATCH_RESULT_CONTINUE; -+ on_dispatched->complete(0); -+ return true; -+ // TODO(): fix domain socket error -+ } -+ -+ auto ctx = new FunctionContext([this, oid, object_off, object_len, -+ read_data, dispatch_result, on_dispatched](bool cache) { -+ handle_read_cache(cache, oid, object_off, object_len, -+ read_data, dispatch_result, on_dispatched); -+ }); -+ -+ if (m_cache_client && m_cache_client->is_session_work() && m_object_store) { -+ m_cache_client->lookup_object(m_image_ctx->data_ctx.get_pool_name(), -+ m_image_ctx->id, oid, ctx); -+ } -+ return true; -+} -+ -+template <typename I> -+int SharedReadOnlyObjectDispatch<I>::handle_read_cache( -+ bool cache, const std::string &oid, uint64_t object_off, -+ uint64_t object_len, ceph::bufferlist* read_data, -+ io::DispatchResult* dispatch_result, Context* on_dispatched) { -+ auto cct = m_image_ctx->cct; -+ ldout(cct, 20) << dendl; -+ -+ // try to read from parent image -+ if (cache) { -+ int r = m_object_store->read_object(oid, read_data, object_off, object_len, on_dispatched); -+ //int r = object_len; -+ if (r != 0) { -+ *dispatch_result = io::DISPATCH_RESULT_COMPLETE; -+ //TODO(): complete in syncfile -+ on_dispatched->complete(r); -+ ldout(cct, 20) << "read cache: " << *dispatch_result <<dendl; -+ return true; -+ } -+ } else { -+ *dispatch_result = io::DISPATCH_RESULT_CONTINUE; -+ on_dispatched->complete(0); -+ ldout(cct, 20) << "read rados: " << *dispatch_result <<dendl; -+ return false; -+ } -+} -+template <typename I> -+void SharedReadOnlyObjectDispatch<I>::client_handle_request(std::string msg) { -+ auto cct = m_image_ctx->cct; -+ ldout(cct, 20) << dendl; -+ -+ ceph::immutable_obj_cache::rbdsc_req_type_t *io_ctx = (ceph::immutable_obj_cache::rbdsc_req_type_t*)(msg.c_str()); -+ -+ switch (io_ctx->type) { -+ case ceph::immutable_obj_cache::RBDSC_REGISTER_REPLY: { -+ // open cache handler for volume -+ ldout(cct, 20) << "SRO cache client open cache handler" << dendl; -+ m_object_store = new SharedPersistentObjectCacher<I>(m_image_ctx, m_image_ctx->shared_cache_path); -+ -+ break; -+ } -+ case ceph::immutable_obj_cache::RBDSC_READ_REPLY: { -+ ldout(cct, 20) << "SRO cache client start to read cache" << dendl; -+ //TODO(): should call read here -+ -+ break; -+ } -+ case ceph::immutable_obj_cache::RBDSC_READ_RADOS: { -+ ldout(cct, 20) << "SRO cache client start to read rados" << dendl; -+ //TODO(): should call read here -+ -+ break; -+ } -+ default: ldout(cct, 20) << "nothing" << io_ctx->type <<dendl; -+ break; -+ -+ } -+} -+ -+} // namespace cache -+} // namespace librbd -+ -+template class librbd::cache::SharedReadOnlyObjectDispatch<librbd::ImageCtx>; -diff --git a/src/librbd/cache/SharedReadOnlyObjectDispatch.h b/src/librbd/cache/SharedReadOnlyObjectDispatch.h -new file mode 100644 -index 0000000..9b56da9 ---- /dev/null -+++ b/src/librbd/cache/SharedReadOnlyObjectDispatch.h -@@ -0,0 +1,126 @@ -+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -+// vim: ts=8 sw=2 smarttab -+ -+#ifndef CEPH_LIBRBD_CACHE_SHARED_PERSISTENT_OBJECT_CACHER_OBJECT_DISPATCH_H -+#define CEPH_LIBRBD_CACHE_SHARED_PERSISTENT_OBJECT_CACHER_OBJECT_DISPATCH_H -+ -+#include "common/Mutex.h" -+#include "SharedPersistentObjectCacher.h" -+#include "librbd/io/ObjectDispatchInterface.h" -+#include "tools/ceph_immutable_object_cache/CacheClient.h" -+ -+ -+namespace librbd { -+ -+class ImageCtx; -+ -+namespace cache { -+ -+template <typename ImageCtxT = ImageCtx> -+class SharedReadOnlyObjectDispatch : public io::ObjectDispatchInterface { -+public: -+ static SharedReadOnlyObjectDispatch* create(ImageCtxT* image_ctx) { -+ return new SharedReadOnlyObjectDispatch(image_ctx); -+ } -+ -+ SharedReadOnlyObjectDispatch(ImageCtxT* image_ctx); -+ ~SharedReadOnlyObjectDispatch() override; -+ -+ io::ObjectDispatchLayer get_object_dispatch_layer() const override { -+ return io::OBJECT_DISPATCH_LAYER_SHARED_PERSISTENT_CACHE; -+ } -+ -+ void init(); -+ void shut_down(Context* on_finish) { -+ m_image_ctx->op_work_queue->queue(on_finish, 0); -+ } -+ -+ bool read( -+ const std::string &oid, uint64_t object_no, uint64_t object_off, -+ uint64_t object_len, librados::snap_t snap_id, int op_flags, -+ const ZTracer::Trace &parent_trace, ceph::bufferlist* read_data, -+ io::ExtentMap* extent_map, int* object_dispatch_flags, -+ io::DispatchResult* dispatch_result, Context** on_finish, -+ Context* on_dispatched) override; -+ -+ bool discard( -+ const std::string &oid, uint64_t object_no, uint64_t object_off, -+ uint64_t object_len, const ::SnapContext &snapc, int discard_flags, -+ const ZTracer::Trace &parent_trace, int* object_dispatch_flags, -+ uint64_t* journal_tid, io::DispatchResult* dispatch_result, -+ Context** on_finish, Context* on_dispatched) { -+ return false; -+ } -+ -+ bool write( -+ const std::string &oid, uint64_t object_no, uint64_t object_off, -+ ceph::bufferlist&& data, const ::SnapContext &snapc, int op_flags, -+ const ZTracer::Trace &parent_trace, int* object_dispatch_flags, -+ uint64_t* journal_tid, io::DispatchResult* dispatch_result, -+ Context** on_finish, Context* on_dispatched) { -+ return false; -+ } -+ -+ bool write_same( -+ const std::string &oid, uint64_t object_no, uint64_t object_off, -+ uint64_t object_len, io::Extents&& buffer_extents, -+ ceph::bufferlist&& data, const ::SnapContext &snapc, int op_flags, -+ const ZTracer::Trace &parent_trace, int* object_dispatch_flags, -+ uint64_t* journal_tid, io::DispatchResult* dispatch_result, -+ Context** on_finish, Context* on_dispatched) { -+ return false; -+ } -+ -+ bool compare_and_write( -+ const std::string &oid, uint64_t object_no, uint64_t object_off, -+ ceph::bufferlist&& cmp_data, ceph::bufferlist&& write_data, -+ const ::SnapContext &snapc, int op_flags, -+ const ZTracer::Trace &parent_trace, uint64_t* mismatch_offset, -+ int* object_dispatch_flags, uint64_t* journal_tid, -+ io::DispatchResult* dispatch_result, Context** on_finish, -+ Context* on_dispatched) { -+ return false; -+ } -+ -+ bool flush( -+ io::FlushSource flush_source, const ZTracer::Trace &parent_trace, -+ io::DispatchResult* dispatch_result, Context** on_finish, -+ Context* on_dispatched) { -+ return false; -+ } -+ -+ bool invalidate_cache(Context* on_finish) { -+ return false; -+ } -+ -+ bool reset_existence_cache(Context* on_finish) { -+ return false; -+ } -+ -+ void extent_overwritten( -+ uint64_t object_no, uint64_t object_off, uint64_t object_len, -+ uint64_t journal_tid, uint64_t new_journal_tid) { -+ } -+ -+private: -+ -+ int handle_read_cache( -+ bool cache, -+ const std::string &oid, uint64_t object_off, -+ uint64_t object_len, ceph::bufferlist* read_data, -+ io::DispatchResult* dispatch_result, -+ Context* on_dispatched); -+ void client_handle_request(std::string msg); -+ -+ ImageCtxT* m_image_ctx; -+ -+ ceph::immutable_obj_cache::CacheClient *m_cache_client = nullptr; -+ SharedPersistentObjectCacher<ImageCtxT> *m_object_store = nullptr; -+}; -+ -+} // namespace cache -+} // namespace librbd -+ -+extern template class librbd::cache::SharedReadOnlyObjectDispatch<librbd::ImageCtx>; -+ -+#endif // CEPH_LIBRBD_CACHE_OBJECT_CACHER_OBJECT_DISPATCH_H -diff --git a/src/librbd/image/OpenRequest.cc b/src/librbd/image/OpenRequest.cc -index 30a7b66..57ce92f 100644 ---- a/src/librbd/image/OpenRequest.cc -+++ b/src/librbd/image/OpenRequest.cc -@@ -8,7 +8,7 @@ - #include "librbd/ImageCtx.h" - #include "librbd/Utils.h" - #include "librbd/cache/ObjectCacherObjectDispatch.h" --#include "librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc" -+#include "librbd/cache/SharedReadOnlyObjectDispatch.cc" - #include "librbd/image/CloseRequest.h" - #include "librbd/image/RefreshRequest.h" - #include "librbd/image/SetSnapRequest.h" -@@ -457,7 +457,7 @@ Context *OpenRequest<I>::send_init_cache(int *result) { - // enable Shared Read-only cache for parent image - if (m_image_ctx->child != nullptr && m_image_ctx->shared_cache_enabled ) { - ldout(cct, 10) << this << " " << "setting up parent cache"<< dendl; -- auto sro_cache = cache::SharedPersistentObjectCacherObjectDispatch<I>::create(m_image_ctx); -+ auto sro_cache = cache::SharedReadOnlyObjectDispatch<I>::create(m_image_ctx); - sro_cache->init(); - } - -diff --git a/src/tools/CMakeLists.txt b/src/tools/CMakeLists.txt -index 72ab342..f7c5872 100644 ---- a/src/tools/CMakeLists.txt -+++ b/src/tools/CMakeLists.txt -@@ -99,7 +99,6 @@ endif(WITH_CEPHFS) - if(WITH_RBD) - add_subdirectory(rbd) - add_subdirectory(rbd_mirror) -- add_subdirectory(rbd_cache) - if(LINUX) - add_subdirectory(rbd_nbd) - endif() -@@ -108,4 +107,5 @@ if(WITH_RBD) - endif() - endif(WITH_RBD) - -+add_subdirectory(ceph_immutable_object_cache) - add_subdirectory(ceph-dencoder) -diff --git a/src/tools/ceph_immutable_object_cache/CMakeLists.txt b/src/tools/ceph_immutable_object_cache/CMakeLists.txt -new file mode 100644 -index 0000000..c7c7af3 ---- /dev/null -+++ b/src/tools/ceph_immutable_object_cache/CMakeLists.txt -@@ -0,0 +1,11 @@ -+add_executable(ceph-immutable-object-cache -+ ${CMAKE_SOURCE_DIR}/src/librbd/cache/SharedPersistentObjectCacherFile.cc -+ ObjectCacheStore.cc -+ CacheController.cc -+ CacheServer.cc -+ CacheSession.cc -+ main.cc) -+target_link_libraries(ceph-immutable-object-cache -+ librados -+ global) -+install(TARGETS ceph-immutable-object-cache DESTINATION bin) -diff --git a/src/tools/ceph_immutable_object_cache/CacheClient.cc b/src/tools/ceph_immutable_object_cache/CacheClient.cc -new file mode 100644 -index 0000000..a7116bf ---- /dev/null -+++ b/src/tools/ceph_immutable_object_cache/CacheClient.cc -@@ -0,0 +1,205 @@ -+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -+// vim: ts=8 sw=2 smarttab -+ -+#include "CacheClient.h" -+ -+#define dout_context g_ceph_context -+#define dout_subsys ceph_subsys_immutable_obj_cache -+#undef dout_prefix -+#define dout_prefix *_dout << "ceph::cache::CacheControllerSocketClient: " << this << " " \ -+ << __func__ << ": " -+ -+ -+using boost::asio::local::stream_protocol; -+ -+namespace ceph { -+namespace immutable_obj_cache { -+ -+ CacheClient::CacheClient(const std::string& file, ClientProcessMsg processmsg, CephContext* ceph_ctx) -+ : m_io_service_work(m_io_service), -+ m_dm_socket(m_io_service), -+ m_client_process_msg(processmsg), -+ m_ep(stream_protocol::endpoint(file)), -+ m_session_work(false), -+ cct(ceph_ctx) -+ { -+ // TODO wrapper io_service -+ std::thread thd([this](){m_io_service.run();}); -+ thd.detach(); -+ } -+ -+ void CacheClient::run(){ -+ } -+ -+ bool CacheClient::is_session_work() { -+ return m_session_work.load() == true; -+ } -+ -+ // just when error occur, call this method. -+ void CacheClient::close() { -+ m_session_work.store(false); -+ boost::system::error_code close_ec; -+ m_dm_socket.close(close_ec); -+ if(close_ec) { -+ ldout(cct, 20) << "close: " << close_ec.message() << dendl; -+ } -+ ldout(cct, 20) << "session don't work, later all request will be dispatched to rados layer" << dendl; -+ } -+ -+ int CacheClient::connect() { -+ boost::system::error_code ec; -+ m_dm_socket.connect(m_ep, ec); -+ if(ec) { -+ if(ec == boost::asio::error::connection_refused) { -+ ldout(cct, 20) << ec.message() << " : maybe rbd-cache Controller don't startup. " -+ << "Now data will be read from ceph cluster " << dendl; -+ } else { -+ ldout(cct, 20) << "connect: " << ec.message() << dendl; -+ } -+ -+ if(m_dm_socket.is_open()) { -+ // Set to indicate what error occurred, if any. -+ // Note that, even if the function indicates an error, -+ // the underlying descriptor is closed. -+ boost::system::error_code close_ec; -+ m_dm_socket.close(close_ec); -+ if(close_ec) { -+ ldout(cct, 20) << "close: " << close_ec.message() << dendl; -+ } -+ } -+ return -1; -+ } -+ -+ ldout(cct, 20) <<"connect success"<< dendl; -+ -+ return 0; -+ } -+ -+ int CacheClient::register_volume(std::string pool_name, std::string vol_name, uint64_t vol_size) { -+ // cache controller will init layout -+ rbdsc_req_type_t *message = new rbdsc_req_type_t(); -+ message->type = RBDSC_REGISTER; -+ memcpy(message->pool_name, pool_name.c_str(), pool_name.size()); -+ memcpy(message->vol_name, vol_name.c_str(), vol_name.size()); -+ message->vol_size = vol_size; -+ message->offset = 0; -+ message->length = 0; -+ -+ uint64_t ret; -+ boost::system::error_code ec; -+ -+ ret = boost::asio::write(m_dm_socket, boost::asio::buffer((char*)message, message->size()), ec); -+ if(ec) { -+ ldout(cct, 20) << "write fails : " << ec.message() << dendl; -+ return -1; -+ } -+ -+ if(ret != message->size()) { -+ ldout(cct, 20) << "write fails : ret != send_bytes " << dendl; -+ return -1; -+ } -+ -+ // hard code TODO -+ ret = boost::asio::read(m_dm_socket, boost::asio::buffer(m_recv_buffer, RBDSC_MSG_LEN), ec); -+ if(ec == boost::asio::error::eof) { -+ ldout(cct, 20) << "recv eof" << dendl; -+ return -1; -+ } -+ -+ if(ec) { -+ ldout(cct, 20) << "write fails : " << ec.message() << dendl; -+ return -1; -+ } -+ -+ if(ret != RBDSC_MSG_LEN) { -+ ldout(cct, 20) << "write fails : ret != receive bytes " << dendl; -+ return -1; -+ } -+ -+ m_client_process_msg(std::string(m_recv_buffer, ret)); -+ -+ delete message; -+ -+ ldout(cct, 20) << "register volume success" << dendl; -+ -+ // TODO -+ m_session_work.store(true); -+ -+ return 0; -+ } -+ -+ // if occur any error, we just return false. Then read from rados. -+ int CacheClient::lookup_object(std::string pool_name, std::string vol_name, std::string object_id, Context* on_finish) { -+ rbdsc_req_type_t *message = new rbdsc_req_type_t(); -+ message->type = RBDSC_READ; -+ memcpy(message->pool_name, pool_name.c_str(), pool_name.size()); -+ memcpy(message->vol_name, object_id.c_str(), object_id.size()); -+ message->vol_size = 0; -+ message->offset = 0; -+ message->length = 0; -+ -+ boost::asio::async_write(m_dm_socket, -+ boost::asio::buffer((char*)message, message->size()), -+ boost::asio::transfer_exactly(RBDSC_MSG_LEN), -+ [this, on_finish, message](const boost::system::error_code& err, size_t cb) { -+ delete message; -+ if(err) { -+ ldout(cct, 20) << "lookup_object: async_write fails." << err.message() << dendl; -+ close(); -+ on_finish->complete(false); -+ return; -+ } -+ if(cb != RBDSC_MSG_LEN) { -+ ldout(cct, 20) << "lookup_object: async_write fails. in-complete request" << dendl; -+ close(); -+ on_finish->complete(false); -+ return; -+ } -+ get_result(on_finish); -+ }); -+ -+ return 0; -+ } -+ -+ void CacheClient::get_result(Context* on_finish) { -+ boost::asio::async_read(m_dm_socket, boost::asio::buffer(m_recv_buffer, RBDSC_MSG_LEN), -+ boost::asio::transfer_exactly(RBDSC_MSG_LEN), -+ [this, on_finish](const boost::system::error_code& err, size_t cb) { -+ if(err == boost::asio::error::eof) { -+ ldout(cct, 20) << "get_result: ack is EOF." << dendl; -+ close(); -+ on_finish->complete(false); -+ return; -+ } -+ if(err) { -+ ldout(cct, 20) << "get_result: async_read fails:" << err.message() << dendl; -+ close(); -+ on_finish->complete(false); // TODO replace this assert with some metohds. -+ return; -+ } -+ if (cb != RBDSC_MSG_LEN) { -+ close(); -+ ldout(cct, 20) << "get_result: in-complete ack." << dendl; -+ on_finish->complete(false); // TODO: replace this assert with some methods. -+ } -+ -+ rbdsc_req_type_t *io_ctx = (rbdsc_req_type_t*)(m_recv_buffer); -+ -+ // TODO: re-occur yuan's bug -+ if(io_ctx->type == RBDSC_READ) { -+ ldout(cct, 20) << "get rbdsc_read... " << dendl; -+ assert(0); -+ } -+ -+ if (io_ctx->type == RBDSC_READ_REPLY) { -+ on_finish->complete(true); -+ return; -+ } else { -+ on_finish->complete(false); -+ return; -+ } -+ }); -+ } -+ -+} // namespace immutable_obj_cache -+} // namespace ceph -diff --git a/src/tools/ceph_immutable_object_cache/CacheClient.h b/src/tools/ceph_immutable_object_cache/CacheClient.h -new file mode 100644 -index 0000000..d82ab8f ---- /dev/null -+++ b/src/tools/ceph_immutable_object_cache/CacheClient.h -@@ -0,0 +1,53 @@ -+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -+// vim: ts=8 sw=2 smarttab -+ -+#ifndef CEPH_CACHE_CLIENT_H -+#define CEPH_CACHE_CLIENT_H -+ -+#include <atomic> -+#include <boost/asio.hpp> -+#include <boost/bind.hpp> -+#include <boost/asio/error.hpp> -+#include <boost/algorithm/string.hpp> -+#include "librbd/ImageCtx.h" -+#include "include/assert.h" -+#include "include/Context.h" -+#include "SocketCommon.h" -+ -+ -+using boost::asio::local::stream_protocol; -+ -+namespace ceph { -+namespace immutable_obj_cache { -+ -+class CacheClient { -+public: -+ CacheClient(const std::string& file, ClientProcessMsg processmsg, CephContext* ceph_ctx); -+ void run(); -+ bool is_session_work(); -+ -+ void close(); -+ int connect(); -+ -+ int register_volume(std::string pool_name, std::string vol_name, uint64_t vol_size); -+ int lookup_object(std::string pool_name, std::string vol_name, std::string object_id, Context* on_finish); -+ void get_result(Context* on_finish); -+ -+private: -+ boost::asio::io_service m_io_service; -+ boost::asio::io_service::work m_io_service_work; -+ stream_protocol::socket m_dm_socket; -+ ClientProcessMsg m_client_process_msg; -+ stream_protocol::endpoint m_ep; -+ char m_recv_buffer[1024]; -+ -+ // atomic modfiy for this variable. -+ // thread 1 : asio callback thread modify it. -+ // thread 2 : librbd read it. -+ std::atomic<bool> m_session_work; -+ CephContext* cct; -+}; -+ -+} // namespace immutable_obj_cache -+} // namespace ceph -+#endif -diff --git a/src/tools/ceph_immutable_object_cache/CacheController.cc b/src/tools/ceph_immutable_object_cache/CacheController.cc -new file mode 100644 -index 0000000..cb636d2 ---- /dev/null -+++ b/src/tools/ceph_immutable_object_cache/CacheController.cc -@@ -0,0 +1,117 @@ -+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -+// vim: ts=8 sw=2 smarttab -+ -+#include "CacheController.h" -+ -+#define dout_context g_ceph_context -+#define dout_subsys ceph_subsys_immutable_obj_cache -+#undef dout_prefix -+#define dout_prefix *_dout << "ceph::cache::CacheController: " << this << " " \ -+ << __func__ << ": " -+ -+namespace ceph { -+namespace immutable_obj_cache { -+ -+class ThreadPoolSingleton : public ThreadPool { -+public: -+ ContextWQ *op_work_queue; -+ -+ explicit ThreadPoolSingleton(CephContext *cct) -+ : ThreadPool(cct, "ceph::cache::thread_pool", "tp_librbd_cache", 32, -+ "pcache_threads"), -+ op_work_queue(new ContextWQ("ceph::pcache_op_work_queue", -+ cct->_conf.get_val<int64_t>("rbd_op_thread_timeout"), -+ this)) { -+ start(); -+ } -+ ~ThreadPoolSingleton() override { -+ op_work_queue->drain(); -+ delete op_work_queue; -+ -+ stop(); -+ } -+}; -+ -+ -+CacheController::CacheController(CephContext *cct, const std::vector<const char*> &args): -+ m_args(args), m_cct(cct) { -+ -+} -+ -+CacheController::~CacheController() { -+ -+} -+ -+int CacheController::init() { -+ ThreadPoolSingleton* thread_pool_singleton = &m_cct->lookup_or_create_singleton_object<ThreadPoolSingleton>( -+ "ceph::cache::thread_pool", false, m_cct); -+ pcache_op_work_queue = thread_pool_singleton->op_work_queue; -+ -+ m_object_cache_store = new ObjectCacheStore(m_cct, pcache_op_work_queue); -+ //TODO(): make this configurable -+ int r = m_object_cache_store->init(true); -+ if (r < 0) { -+ lderr(m_cct) << "init error\n" << dendl; -+ } -+ return r; -+} -+ -+int CacheController::shutdown() { -+ int r = m_object_cache_store->shutdown(); -+ return r; -+} -+ -+void CacheController::handle_signal(int signum){} -+ -+void CacheController::run() { -+ try { -+ //TODO(): use new socket path -+ std::string controller_path = m_cct->_conf.get_val<std::string>("rbd_shared_cache_sock"); -+ std::remove(controller_path.c_str()); -+ -+ m_cache_server = new CacheServer(controller_path, -+ ([&](uint64_t p, std::string s){handle_request(p, s);}), m_cct); -+ m_cache_server->run(); -+ } catch (std::exception& e) { -+ lderr(m_cct) << "Exception: " << e.what() << dendl; -+ } -+} -+ -+void CacheController::handle_request(uint64_t session_id, std::string msg){ -+ rbdsc_req_type_t *io_ctx = (rbdsc_req_type_t*)(msg.c_str()); -+ -+ int ret = 0; -+ -+ switch (io_ctx->type) { -+ case RBDSC_REGISTER: { -+ // init cache layout for volume -+ m_object_cache_store->init_cache(io_ctx->vol_name, io_ctx->vol_size); -+ io_ctx->type = RBDSC_REGISTER_REPLY; -+ m_cache_server->send(session_id, std::string((char*)io_ctx, msg.size())); -+ -+ break; -+ } -+ case RBDSC_READ: { -+ // lookup object in local cache store -+ ret = m_object_cache_store->lookup_object(io_ctx->pool_name, io_ctx->vol_name); -+ if (ret < 0) { -+ io_ctx->type = RBDSC_READ_RADOS; -+ } else { -+ io_ctx->type = RBDSC_READ_REPLY; -+ } -+ if (io_ctx->type != RBDSC_READ_REPLY) { -+ assert(0); -+ } -+ m_cache_server->send(session_id, std::string((char*)io_ctx, msg.size())); -+ -+ break; -+ } -+ ldout(m_cct, 5) << "can't recongize request" << dendl; -+ assert(0); // TODO replace it. -+ } -+} -+ -+} // namespace immutable_obj_cache -+} // namespace ceph -+ -+ -diff --git a/src/tools/ceph_immutable_object_cache/CacheController.h b/src/tools/ceph_immutable_object_cache/CacheController.h -new file mode 100644 -index 0000000..837fe36 ---- /dev/null -+++ b/src/tools/ceph_immutable_object_cache/CacheController.h -@@ -0,0 +1,53 @@ -+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -+// vim: ts=8 sw=2 smarttab -+ -+#ifndef CEPH_CACHE_CONTROLLER_H -+#define CEPH_CACHE_CONTROLLER_H -+ -+#include "common/Formatter.h" -+#include "common/admin_socket.h" -+#include "common/debug.h" -+#include "common/errno.h" -+#include "common/ceph_context.h" -+#include "common/Mutex.h" -+#include "common/WorkQueue.h" -+#include "include/rados/librados.hpp" -+#include "include/rbd/librbd.h" -+#include "include/assert.h" -+#include "librbd/ImageCtx.h" -+#include "librbd/ImageState.h" -+#include "CacheServer.h" -+#include "ObjectCacheStore.h" -+ -+#include <thread> -+ -+namespace ceph { -+namespace immutable_obj_cache { -+ -+class CacheController { -+ public: -+ CacheController(CephContext *cct, const std::vector<const char*> &args); -+ ~CacheController(); -+ -+ int init(); -+ -+ int shutdown(); -+ -+ void handle_signal(int sinnum); -+ -+ void run(); -+ -+ void handle_request(uint64_t sesstion_id, std::string msg); -+ -+ private: -+ CacheServer *m_cache_server; -+ std::vector<const char*> m_args; -+ CephContext *m_cct; -+ ObjectCacheStore *m_object_cache_store; -+ ContextWQ* pcache_op_work_queue; -+}; -+ -+} // namespace immutable_obj_cache -+} // namespace ceph -+ -+#endif -diff --git a/src/tools/ceph_immutable_object_cache/CacheServer.cc b/src/tools/ceph_immutable_object_cache/CacheServer.cc -new file mode 100644 -index 0000000..dd2d47e ---- /dev/null -+++ b/src/tools/ceph_immutable_object_cache/CacheServer.cc -@@ -0,0 +1,99 @@ -+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -+// vim: ts=8 sw=2 smarttab -+ -+#include "common/debug.h" -+#include "common/ceph_context.h" -+#include "CacheServer.h" -+ -+#define dout_context g_ceph_context -+#define dout_subsys ceph_subsys_immutable_obj_cache -+#undef dout_prefix -+#define dout_prefix *_dout << "ceph::cache::CacheControllerSocket: " << this << " " \ -+ << __func__ << ": " -+ -+ -+using boost::asio::local::stream_protocol; -+ -+namespace ceph { -+namespace immutable_obj_cache { -+ -+CacheServer::CacheServer(const std::string& file, ProcessMsg processmsg, CephContext* cct) -+ : cct(cct), m_server_process_msg(processmsg), -+ m_local_path(file), m_acceptor(m_io_service) {} -+ -+CacheServer::~CacheServer(){} -+ -+void CacheServer::run() { -+ bool ret; -+ ret = start_accept(); -+ if(!ret) { -+ return; -+ } -+ m_io_service.run(); -+} -+ -+// TODO : use callback to replace this function. -+void CacheServer::send(uint64_t session_id, std::string msg) { -+ auto it = m_session_map.find(session_id); -+ if (it != m_session_map.end()) { -+ it->second->send(msg); -+ } else { -+ // TODO : why don't find existing session id ? -+ ldout(cct, 20) << "don't find session id..." << dendl; -+ assert(0); -+ } -+} -+ -+// when creating one acceptor, can control every step in this way. -+bool CacheServer::start_accept() { -+ boost::system::error_code ec; -+ m_acceptor.open(m_local_path.protocol(), ec); -+ if(ec) { -+ ldout(cct, 20) << "m_acceptor open fails: " << ec.message() << dendl; -+ return false; -+ } -+ -+ // TODO control acceptor attribute. -+ -+ m_acceptor.bind(m_local_path, ec); -+ if(ec) { -+ ldout(cct, 20) << "m_acceptor bind fails: " << ec.message() << dendl; -+ return false; -+ } -+ -+ m_acceptor.listen(boost::asio::socket_base::max_connections, ec); -+ if(ec) { -+ ldout(cct, 20) << "m_acceptor listen fails: " << ec.message() << dendl; -+ return false; -+ } -+ -+ accept(); -+ return true; -+} -+ -+void CacheServer::accept() { -+ CacheSessionPtr new_session(new CacheSession(m_session_id, m_io_service, m_server_process_msg, cct)); -+ m_acceptor.async_accept(new_session->socket(), -+ boost::bind(&CacheServer::handle_accept, this, new_session, -+ boost::asio::placeholders::error)); -+} -+ -+void CacheServer::handle_accept(CacheSessionPtr new_session, const boost::system::error_code& error) { -+ -+ if(error) { -+ lderr(cct) << "async accept fails : " << error.message() << dendl; -+ assert(0); // TODO -+ } -+ -+ m_session_map.emplace(m_session_id, new_session); -+ // TODO : session setting -+ new_session->start(); -+ m_session_id++; -+ -+ // lanuch next accept -+ accept(); -+} -+ -+} // namespace immutable_obj_cache -+} // namespace ceph -+ -diff --git a/src/tools/ceph_immutable_object_cache/CacheServer.h b/src/tools/ceph_immutable_object_cache/CacheServer.h -new file mode 100644 -index 0000000..6c5c133 ---- /dev/null -+++ b/src/tools/ceph_immutable_object_cache/CacheServer.h -@@ -0,0 +1,54 @@ -+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -+// vim: ts=8 sw=2 smarttab -+ -+#ifndef CEPH_CACHE_SERVER_H -+#define CEPH_CACHE_SERVER_H -+ -+#include <cstdio> -+#include <iostream> -+#include <array> -+#include <memory> -+#include <string> -+#include <boost/bind.hpp> -+#include <boost/asio.hpp> -+#include <boost/asio/error.hpp> -+#include <boost/algorithm/string.hpp> -+ -+#include "include/assert.h" -+#include "SocketCommon.h" -+#include "CacheSession.h" -+ -+ -+using boost::asio::local::stream_protocol; -+ -+namespace ceph { -+namespace immutable_obj_cache { -+ -+class CacheServer { -+ -+ public: -+ CacheServer(const std::string& file, ProcessMsg processmsg, CephContext* cct); -+ ~CacheServer(); -+ -+ void run(); -+ void send(uint64_t session_id, std::string msg); -+ -+ private: -+ bool start_accept(); -+ void accept(); -+ void handle_accept(CacheSessionPtr new_session, const boost::system::error_code& error); -+ -+ private: -+ CephContext* cct; -+ boost::asio::io_service m_io_service; // TODO wrapper it. -+ ProcessMsg m_server_process_msg; -+ stream_protocol::endpoint m_local_path; -+ stream_protocol::acceptor m_acceptor; -+ uint64_t m_session_id = 1; -+ std::map<uint64_t, CacheSessionPtr> m_session_map; -+}; -+ -+} // namespace immutable_obj_cache -+} // namespace ceph -+ -+#endif -diff --git a/src/tools/ceph_immutable_object_cache/CacheSession.cc b/src/tools/ceph_immutable_object_cache/CacheSession.cc -new file mode 100644 -index 0000000..6cffb41 ---- /dev/null -+++ b/src/tools/ceph_immutable_object_cache/CacheSession.cc -@@ -0,0 +1,115 @@ -+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -+// vim: ts=8 sw=2 smarttab -+ -+#include "common/debug.h" -+#include "common/ceph_context.h" -+#include "CacheSession.h" -+ -+#define dout_context g_ceph_context -+#define dout_subsys ceph_subsys_immutable_obj_cache -+#undef dout_prefix -+#define dout_prefix *_dout << "ceph::cache::CacheSession: " << this << " " \ -+ << __func__ << ": " -+ -+ -+namespace ceph { -+namespace immutable_obj_cache { -+ -+CacheSession::CacheSession(uint64_t session_id, boost::asio::io_service& io_service, ProcessMsg processmsg, CephContext* cct) -+ : m_session_id(session_id), m_dm_socket(io_service), process_msg(processmsg), cct(cct) -+ {} -+ -+CacheSession::~CacheSession(){} -+ -+stream_protocol::socket& CacheSession::socket() { -+ return m_dm_socket; -+} -+ -+void CacheSession::start() { -+ if(true) { -+ serial_handing_request(); -+ } else { -+ parallel_handing_request(); -+ } -+} -+// flow: -+// -+// recv request --> process request --> reply ack -+// | | -+// --------------<------------------------- -+void CacheSession::serial_handing_request() { -+ boost::asio::async_read(m_dm_socket, boost::asio::buffer(m_buffer, RBDSC_MSG_LEN), -+ boost::asio::transfer_exactly(RBDSC_MSG_LEN), -+ boost::bind(&CacheSession::handle_read, -+ shared_from_this(), -+ boost::asio::placeholders::error, -+ boost::asio::placeholders::bytes_transferred)); -+} -+ -+// flow : -+// -+// --> thread 1: process request -+// recv request --> thread 2: process request --> reply ack -+// --> thread n: process request -+// -+void CacheSession::parallel_handing_request() { -+ // TODO -+} -+ -+void CacheSession::handle_read(const boost::system::error_code& error, size_t bytes_transferred) { -+ // when recv eof, the most proble is that client side close socket. -+ // so, server side need to end handing_request -+ if(error == boost::asio::error::eof) { -+ ldout(cct, 20) << "session: async_read : " << error.message() << dendl; -+ return; -+ } -+ -+ if(error) { -+ ldout(cct, 20) << "session: async_read fails: " << error.message() << dendl; -+ assert(0); -+ } -+ -+ if(bytes_transferred != RBDSC_MSG_LEN) { -+ ldout(cct, 20) << "session : request in-complete. "<<dendl; -+ assert(0); -+ } -+ -+ // TODO async_process can increse coding readable. -+ // process_msg_callback call handle async_send -+ process_msg(m_session_id, std::string(m_buffer, bytes_transferred)); -+} -+ -+void CacheSession::handle_write(const boost::system::error_code& error, size_t bytes_transferred) { -+ if (error) { -+ ldout(cct, 20) << "session: async_write fails: " << error.message() << dendl; -+ assert(0); -+ } -+ -+ if(bytes_transferred != RBDSC_MSG_LEN) { -+ ldout(cct, 20) << "session : reply in-complete. "<<dendl; -+ assert(0); -+ } -+ -+ boost::asio::async_read(m_dm_socket, boost::asio::buffer(m_buffer), -+ boost::asio::transfer_exactly(RBDSC_MSG_LEN), -+ boost::bind(&CacheSession::handle_read, -+ shared_from_this(), -+ boost::asio::placeholders::error, -+ boost::asio::placeholders::bytes_transferred)); -+ -+} -+ -+void CacheSession::send(std::string msg) { -+ boost::asio::async_write(m_dm_socket, -+ boost::asio::buffer(msg.c_str(), msg.size()), -+ boost::asio::transfer_exactly(RBDSC_MSG_LEN), -+ boost::bind(&CacheSession::handle_write, -+ shared_from_this(), -+ boost::asio::placeholders::error, -+ boost::asio::placeholders::bytes_transferred)); -+ -+} -+ -+} // namespace immutable_obj_cache -+} // namespace ceph -+ -diff --git a/src/tools/ceph_immutable_object_cache/CacheSession.h b/src/tools/ceph_immutable_object_cache/CacheSession.h -new file mode 100644 -index 0000000..ce2591b ---- /dev/null -+++ b/src/tools/ceph_immutable_object_cache/CacheSession.h -@@ -0,0 +1,58 @@ -+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -+// vim: ts=8 sw=2 smarttab -+ -+#ifndef CEPH_CACHE_SESSION_H -+#define CEPH_CACHE_SESSION_H -+ -+#include <iostream> -+#include <string> -+#include <boost/bind.hpp> -+#include <boost/asio.hpp> -+#include <boost/asio/error.hpp> -+#include <boost/algorithm/string.hpp> -+ -+#include "include/assert.h" -+#include "SocketCommon.h" -+ -+ -+using boost::asio::local::stream_protocol; -+ -+namespace ceph { -+namespace immutable_obj_cache { -+ -+class CacheSession : public std::enable_shared_from_this<CacheSession> { -+public: -+ CacheSession(uint64_t session_id, boost::asio::io_service& io_service, ProcessMsg processmsg, CephContext* cct); -+ ~CacheSession(); -+ -+ stream_protocol::socket& socket(); -+ void start(); -+ void serial_handing_request(); -+ void parallel_handing_request(); -+ -+private: -+ -+ void handle_read(const boost::system::error_code& error, size_t bytes_transferred); -+ -+ void handle_write(const boost::system::error_code& error, size_t bytes_transferred); -+ -+public: -+ void send(std::string msg); -+ -+private: -+ uint64_t m_session_id; -+ stream_protocol::socket m_dm_socket; -+ ProcessMsg process_msg; -+ CephContext* cct; -+ -+ // Buffer used to store data received from the client. -+ //std::array<char, 1024> data_; -+ char m_buffer[1024]; -+}; -+ -+typedef std::shared_ptr<CacheSession> CacheSessionPtr; -+ -+} // namespace immutable_obj_cache -+} // namespace ceph -+ -+#endif -diff --git a/src/tools/ceph_immutable_object_cache/ObjectCacheStore.cc b/src/tools/ceph_immutable_object_cache/ObjectCacheStore.cc -new file mode 100644 -index 0000000..50721ca ---- /dev/null -+++ b/src/tools/ceph_immutable_object_cache/ObjectCacheStore.cc -@@ -0,0 +1,172 @@ -+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -+// vim: ts=8 sw=2 smarttab -+ -+#include "ObjectCacheStore.h" -+ -+#define dout_context g_ceph_context -+#define dout_subsys ceph_subsys_immutable_obj_cache -+#undef dout_prefix -+#define dout_prefix *_dout << "ceph::cache::ObjectCacheStore: " << this << " " \ -+ << __func__ << ": " -+ -+namespace ceph { -+namespace immutable_obj_cache { -+ -+ObjectCacheStore::ObjectCacheStore(CephContext *cct, ContextWQ* work_queue) -+ : m_cct(cct), m_work_queue(work_queue), -+ m_rados(new librados::Rados()) { -+ -+ uint64_t object_cache_entries = -+ cct->_conf.get_val<int64_t>("rbd_shared_cache_entries"); -+ -+ //TODO(): allow to set level -+ m_policy = new SimplePolicy(object_cache_entries, 0.5); -+} -+ -+ObjectCacheStore::~ObjectCacheStore() { -+ delete m_policy; -+} -+ -+int ObjectCacheStore::init(bool reset) { -+ -+ int ret = m_rados->init_with_context(m_cct); -+ if(ret < 0) { -+ lderr(m_cct) << "fail to init Ceph context" << dendl; -+ return ret; -+ } -+ -+ ret = m_rados->connect(); -+ if(ret < 0 ) { -+ lderr(m_cct) << "fail to conect to cluster" << dendl; -+ return ret; -+ } -+ -+ std::string cache_path = m_cct->_conf.get_val<std::string>("rbd_shared_cache_path"); -+ //TODO(): check and reuse existing cache objects -+ if(reset) { -+ std::string cmd = "exec rm -rf " + cache_path + "/rbd_cache*; exec mkdir -p " + cache_path; -+ //TODO(): to use std::filesystem -+ int r = system(cmd.c_str()); -+ } -+ -+ evict_thd = new std::thread([this]{this->evict_thread_body();}); -+ return ret; -+} -+ -+int ObjectCacheStore::do_promote(std::string pool_name, std::string object_name) { -+ int ret = 0; -+ std::string cache_file_name = pool_name + object_name; -+ -+ //TODO(): lock on ioctx map -+ if (m_ioctxs.find(pool_name) == m_ioctxs.end()) { -+ librados::IoCtx* io_ctx = new librados::IoCtx(); -+ ret = m_rados->ioctx_create(pool_name.c_str(), *io_ctx); -+ if (ret < 0) { -+ lderr(m_cct) << "fail to create ioctx" << dendl; -+ assert(0); -+ } -+ m_ioctxs.emplace(pool_name, io_ctx); -+ } -+ -+ assert(m_ioctxs.find(pool_name) != m_ioctxs.end()); -+ -+ librados::IoCtx* ioctx = m_ioctxs[pool_name]; -+ -+ librados::bufferlist* read_buf = new librados::bufferlist(); -+ int object_size = 4096*1024; //TODO(): read config from image metadata -+ -+ //TODO(): async promote -+ ret = promote_object(ioctx, object_name, read_buf, object_size); -+ if (ret == -ENOENT) { -+ read_buf->append(std::string(object_size, '0')); -+ ret = 0; -+ } -+ -+ if( ret < 0) { -+ lderr(m_cct) << "fail to read from rados" << dendl; -+ return ret; -+ } -+ -+ // persistent to cache -+ librbd::cache::SyncFile cache_file(m_cct, cache_file_name); -+ cache_file.open(); -+ ret = cache_file.write_object_to_file(*read_buf, object_size); -+ -+ // update metadata -+ assert(OBJ_CACHE_PROMOTING == m_policy->get_status(cache_file_name)); -+ m_policy->update_status(cache_file_name, OBJ_CACHE_PROMOTED); -+ assert(OBJ_CACHE_PROMOTED == m_policy->get_status(cache_file_name)); -+ -+ return ret; -+ -+} -+ -+// return -1, client need to read data from cluster. -+// return 0, client directly read data from cache. -+int ObjectCacheStore::lookup_object(std::string pool_name, std::string object_name) { -+ -+ std::string cache_file_name = pool_name + object_name; -+ -+ CACHESTATUS ret; -+ ret = m_policy->lookup_object(cache_file_name); -+ -+ switch(ret) { -+ case OBJ_CACHE_NONE: -+ return do_promote(pool_name, object_name); -+ case OBJ_CACHE_PROMOTED: -+ return 0; -+ case OBJ_CACHE_PROMOTING: -+ default: -+ return -1; -+ } -+} -+ -+void ObjectCacheStore::evict_thread_body() { -+ int ret; -+ while(m_evict_go) { -+ ret = evict_objects(); -+ } -+} -+ -+ -+int ObjectCacheStore::shutdown() { -+ m_evict_go = false; -+ evict_thd->join(); -+ m_rados->shutdown(); -+ return 0; -+} -+ -+int ObjectCacheStore::init_cache(std::string vol_name, uint64_t vol_size) { -+ return 0; -+} -+ -+int ObjectCacheStore::lock_cache(std::string vol_name) { -+ return 0; -+} -+ -+int ObjectCacheStore::promote_object(librados::IoCtx* ioctx, std::string object_name, librados::bufferlist* read_buf, uint64_t read_len) { -+ int ret; -+ -+ librados::AioCompletion* read_completion = librados::Rados::aio_create_completion(); -+ -+ ret = ioctx->aio_read(object_name, read_completion, read_buf, read_len, 0); -+ if(ret < 0) { -+ lderr(m_cct) << "fail to read from rados" << dendl; -+ return ret; -+ } -+ read_completion->wait_for_complete(); -+ ret = read_completion->get_return_value(); -+ return ret; -+ -+} -+ -+int ObjectCacheStore::evict_objects() { -+ std::list<std::string> obj_list; -+ m_policy->get_evict_list(&obj_list); -+ for (auto& obj: obj_list) { -+ //do_evict(obj); -+ } -+} -+ -+} // namespace immutable_obj_cache -+} // namespace ceph -diff --git a/src/tools/ceph_immutable_object_cache/ObjectCacheStore.h b/src/tools/ceph_immutable_object_cache/ObjectCacheStore.h -new file mode 100644 -index 0000000..d044b27 ---- /dev/null -+++ b/src/tools/ceph_immutable_object_cache/ObjectCacheStore.h -@@ -0,0 +1,70 @@ -+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -+// vim: ts=8 sw=2 smarttab -+ -+#ifndef CEPH_CACHE_OBJECT_CACHE_STORE_H -+#define CEPH_CACHE_OBJECT_CACHE_STORE_H -+ -+#include "common/debug.h" -+#include "common/errno.h" -+#include "common/ceph_context.h" -+#include "common/Mutex.h" -+#include "include/rados/librados.hpp" -+#include "include/rbd/librbd.h" -+#include "librbd/ImageCtx.h" -+#include "librbd/ImageState.h" -+#include "librbd/cache/SharedPersistentObjectCacherFile.h" -+#include "SimplePolicy.hpp" -+ -+ -+using librados::Rados; -+using librados::IoCtx; -+ -+namespace ceph { -+namespace immutable_obj_cache { -+ -+typedef shared_ptr<librados::Rados> RadosRef; -+typedef shared_ptr<librados::IoCtx> IoCtxRef; -+ -+class ObjectCacheStore -+{ -+ public: -+ ObjectCacheStore(CephContext *cct, ContextWQ* work_queue); -+ ~ObjectCacheStore(); -+ -+ int init(bool reset); -+ -+ int shutdown(); -+ -+ int lookup_object(std::string pool_name, std::string object_name); -+ -+ int init_cache(std::string vol_name, uint64_t vol_size); -+ -+ int lock_cache(std::string vol_name); -+ -+ private: -+ void evict_thread_body(); -+ int evict_objects(); -+ -+ int do_promote(std::string pool_name, std::string object_name); -+ -+ int promote_object(librados::IoCtx*, std::string object_name, -+ librados::bufferlist* read_buf, -+ uint64_t length); -+ -+ CephContext *m_cct; -+ ContextWQ* m_work_queue; -+ RadosRef m_rados; -+ -+ -+ std::map<std::string, librados::IoCtx*> m_ioctxs; -+ -+ librbd::cache::SyncFile *m_cache_file; -+ -+ Policy* m_policy; -+ std::thread* evict_thd; -+ bool m_evict_go = false; -+}; -+ -+} // namespace ceph -+} // namespace immutable_obj_cache -+#endif -diff --git a/src/tools/ceph_immutable_object_cache/Policy.hpp b/src/tools/ceph_immutable_object_cache/Policy.hpp -new file mode 100644 -index 0000000..8090202 ---- /dev/null -+++ b/src/tools/ceph_immutable_object_cache/Policy.hpp -@@ -0,0 +1,33 @@ -+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -+// vim: ts=8 sw=2 smarttab -+ -+#ifndef CEPH_CACHE_POLICY_HPP -+#define CEPH_CACHE_POLICY_HPP -+ -+#include <list> -+#include <string> -+ -+namespace ceph { -+namespace immutable_obj_cache { -+ -+enum CACHESTATUS { -+ OBJ_CACHE_NONE = 0, -+ OBJ_CACHE_PROMOTING, -+ OBJ_CACHE_PROMOTED, -+}; -+ -+ -+class Policy { -+public: -+ Policy(){} -+ virtual ~Policy(){}; -+ virtual CACHESTATUS lookup_object(std::string) = 0; -+ virtual int evict_object(std::string&) = 0; -+ virtual void update_status(std::string, CACHESTATUS) = 0; -+ virtual CACHESTATUS get_status(std::string) = 0; -+ virtual void get_evict_list(std::list<std::string>* obj_list) = 0; -+}; -+ -+} // namespace immutable_obj_cache -+} // namespace ceph -+#endif -diff --git a/src/tools/ceph_immutable_object_cache/SimplePolicy.hpp b/src/tools/ceph_immutable_object_cache/SimplePolicy.hpp -new file mode 100644 -index 0000000..757ee6a ---- /dev/null -+++ b/src/tools/ceph_immutable_object_cache/SimplePolicy.hpp -@@ -0,0 +1,163 @@ -+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -+// vim: ts=8 sw=2 smarttab -+ -+#ifndef CEPH_CACHE_SIMPLE_POLICY_HPP -+#define CEPH_CACHE_SIMPLE_POLICY_HPP -+ -+#include "Policy.hpp" -+#include "include/lru.h" -+#include "common/RWLock.h" -+#include "common/Mutex.h" -+ -+#include <vector> -+#include <unordered_map> -+#include <string> -+ -+namespace ceph { -+namespace immutable_obj_cache { -+ -+ -+class SimplePolicy : public Policy { -+public: -+ SimplePolicy(uint64_t block_num, float watermark) -+ : m_watermark(watermark), m_entry_count(block_num), -+ m_cache_map_lock("rbd::cache::SimplePolicy::m_cache_map_lock"), -+ m_free_list_lock("rbd::cache::SimplePolicy::m_free_list_lock") -+ { -+ -+ for(uint64_t i = 0; i < m_entry_count; i++) { -+ m_free_list.push_back(new Entry()); -+ } -+ -+ } -+ -+ ~SimplePolicy() { -+ for(uint64_t i = 0; i < m_entry_count; i++) { -+ Entry* entry = reinterpret_cast<Entry*>(m_free_list.front()); -+ delete entry; -+ m_free_list.pop_front(); -+ } -+ } -+ -+ CACHESTATUS lookup_object(std::string cache_file_name) { -+ -+ //TODO(): check race condition -+ RWLock::WLocker wlocker(m_cache_map_lock); -+ -+ auto entry_it = m_cache_map.find(cache_file_name); -+ if(entry_it == m_cache_map.end()) { -+ Mutex::Locker locker(m_free_list_lock); -+ Entry* entry = reinterpret_cast<Entry*>(m_free_list.front()); -+ assert(entry != nullptr); -+ m_free_list.pop_front(); -+ entry->status = OBJ_CACHE_PROMOTING; -+ -+ m_cache_map[cache_file_name] = entry; -+ -+ return OBJ_CACHE_NONE; -+ } -+ -+ Entry* entry = entry_it->second; -+ -+ if(entry->status == OBJ_CACHE_PROMOTED) { -+ // touch it -+ m_promoted_lru.lru_touch(entry); -+ } -+ -+ return entry->status; -+ } -+ -+ int evict_object(std::string& out_cache_file_name) { -+ RWLock::WLocker locker(m_cache_map_lock); -+ -+ return 1; -+ } -+ -+ // TODO(): simplify the logic -+ void update_status(std::string file_name, CACHESTATUS new_status) { -+ RWLock::WLocker locker(m_cache_map_lock); -+ -+ Entry* entry; -+ auto entry_it = m_cache_map.find(file_name); -+ -+ // just check. -+ if(new_status == OBJ_CACHE_PROMOTING) { -+ assert(entry_it == m_cache_map.end()); -+ } -+ -+ assert(entry_it != m_cache_map.end()); -+ -+ entry = entry_it->second; -+ -+ // promoting is done, so update it. -+ if(entry->status == OBJ_CACHE_PROMOTING && new_status== OBJ_CACHE_PROMOTED) { -+ m_promoted_lru.lru_insert_top(entry); -+ entry->status = new_status; -+ return; -+ } -+ -+ assert(0); -+ } -+ -+ // get entry status -+ CACHESTATUS get_status(std::string file_name) { -+ RWLock::RLocker locker(m_cache_map_lock); -+ auto entry_it = m_cache_map.find(file_name); -+ if(entry_it == m_cache_map.end()) { -+ return OBJ_CACHE_NONE; -+ } -+ -+ return entry_it->second->status; -+ } -+ -+ void get_evict_list(std::list<std::string>* obj_list) { -+ RWLock::WLocker locker(m_cache_map_lock); -+ // check free ratio, pop entries from LRU -+ if (m_free_list.size() / m_entry_count < m_watermark) { -+ int evict_num = 10; //TODO(): make this configurable -+ for(int i = 0; i < evict_num; i++) { -+ Entry* entry = reinterpret_cast<Entry*>(m_promoted_lru.lru_expire()); -+ if (entry == nullptr) { -+ continue; -+ } -+ std::string file_name = entry->cache_file_name; -+ obj_list->push_back(file_name); -+ -+ auto entry_it = m_cache_map.find(file_name); -+ m_cache_map.erase(entry_it); -+ -+ //mark this entry as free -+ entry->status = OBJ_CACHE_NONE; -+ Mutex::Locker locker(m_free_list_lock); -+ m_free_list.push_back(entry); -+ } -+ } -+ } -+ -+private: -+ -+ class Entry : public LRUObject { -+ public: -+ CACHESTATUS status; -+ Entry() : status(OBJ_CACHE_NONE){} -+ std::string cache_file_name; -+ void encode(bufferlist &bl){} -+ void decode(bufferlist::iterator &it){} -+ }; -+ -+ float m_watermark; -+ uint64_t m_entry_count; -+ -+ std::unordered_map<std::string, Entry*> m_cache_map; -+ RWLock m_cache_map_lock; -+ -+ std::deque<Entry*> m_free_list; -+ Mutex m_free_list_lock; -+ -+ LRU m_promoted_lru; // include promoted, using status. -+ -+}; -+ -+} // namespace immutable_obj_cache -+} // namespace ceph -+#endif -diff --git a/src/tools/ceph_immutable_object_cache/SocketCommon.h b/src/tools/ceph_immutable_object_cache/SocketCommon.h -new file mode 100644 -index 0000000..53dca54 ---- /dev/null -+++ b/src/tools/ceph_immutable_object_cache/SocketCommon.h -@@ -0,0 +1,54 @@ -+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -+// vim: ts=8 sw=2 smarttab -+ -+#ifndef CEPH_CACHE_SOCKET_COMMON_H -+#define CEPH_CACHE_SOCKET_COMMON_H -+ -+namespace ceph { -+namespace immutable_obj_cache { -+ -+static const int RBDSC_REGISTER = 0X11; -+static const int RBDSC_READ = 0X12; -+static const int RBDSC_LOOKUP = 0X13; -+static const int RBDSC_REGISTER_REPLY = 0X14; -+static const int RBDSC_READ_REPLY = 0X15; -+static const int RBDSC_LOOKUP_REPLY = 0X16; -+static const int RBDSC_READ_RADOS = 0X17; -+ -+ -+ -+typedef std::function<void(uint64_t, std::string)> ProcessMsg; -+typedef std::function<void(std::string)> ClientProcessMsg; -+typedef uint8_t rbdsc_req_type; -+ -+//TODO(): switch to bufferlist -+struct rbdsc_req_type_t { -+ rbdsc_req_type type; -+ uint64_t vol_size; -+ uint64_t offset; -+ uint64_t length; -+ char pool_name[256]; -+ char vol_name[256]; -+ -+ uint64_t size() { -+ return sizeof(rbdsc_req_type_t); -+ } -+ -+ std::string to_buffer() { -+ std::stringstream ss; -+ ss << type; -+ ss << vol_size; -+ ss << offset; -+ ss << length; -+ ss << pool_name; -+ ss << vol_name; -+ -+ return ss.str(); -+ } -+}; -+ -+static const int RBDSC_MSG_LEN = sizeof(rbdsc_req_type_t); -+ -+} // namespace immutable_obj_cache -+} // namespace ceph -+#endif -diff --git a/src/tools/ceph_immutable_object_cache/main.cc b/src/tools/ceph_immutable_object_cache/main.cc -new file mode 100644 -index 0000000..7a9131d ---- /dev/null -+++ b/src/tools/ceph_immutable_object_cache/main.cc -@@ -0,0 +1,85 @@ -+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -+// vim: ts=8 sw=2 smarttab -+ -+#include "common/ceph_argparse.h" -+#include "common/config.h" -+#include "common/debug.h" -+#include "common/errno.h" -+#include "global/global_init.h" -+#include "global/signal_handler.h" -+#include "CacheController.h" -+ -+#include <vector> -+ -+ceph::immutable_obj_cache::CacheController *cachectl = nullptr; -+ -+void usage() { -+ std::cout << "usage: cache controller [options...]" << std::endl; -+ std::cout << "options:\n"; -+ std::cout << " -m monaddress[:port] connect to specified monitor\n"; -+ std::cout << " --keyring=<path> path to keyring for local cluster\n"; -+ std::cout << " --log-file=<logfile> file to log debug output\n"; -+ std::cout << " --debug-rbd-cachecontroller=<log-level>/<memory-level> set rbd-mirror debug level\n"; -+ generic_server_usage(); -+} -+ -+static void handle_signal(int signum) -+{ -+ if (cachectl) -+ cachectl->handle_signal(signum); -+} -+ -+int main(int argc, const char **argv) -+{ -+ std::vector<const char*> args; -+ env_to_vec(args); -+ argv_to_vec(argc, argv, args); -+ -+ auto cct = global_init(nullptr, args, CEPH_ENTITY_TYPE_CLIENT, -+ CODE_ENVIRONMENT_DAEMON, -+ CINIT_FLAG_UNPRIVILEGED_DAEMON_DEFAULTS); -+ -+ for (auto i = args.begin(); i != args.end(); ++i) { -+ if (ceph_argparse_flag(args, i, "-h", "--help", (char*)NULL)) { -+ usage(); -+ return EXIT_SUCCESS; -+ } -+ } -+ -+ if (g_conf()->daemonize) { -+ global_init_daemonize(g_ceph_context); -+ } -+ g_ceph_context->enable_perf_counter(); -+ -+ common_init_finish(g_ceph_context); -+ -+ init_async_signal_handler(); -+ register_async_signal_handler(SIGHUP, sighup_handler); -+ register_async_signal_handler_oneshot(SIGINT, handle_signal); -+ register_async_signal_handler_oneshot(SIGTERM, handle_signal); -+ -+ std::vector<const char*> cmd_args; -+ argv_to_vec(argc, argv, cmd_args); -+ -+ // disable unnecessary librbd cache -+ g_ceph_context->_conf.set_val_or_die("rbd_cache", "false"); -+ -+ cachectl = new ceph::immutable_obj_cache::CacheController(g_ceph_context, cmd_args); -+ int r = cachectl->init(); -+ if (r < 0) { -+ std::cerr << "failed to initialize: " << cpp_strerror(r) << std::endl; -+ goto cleanup; -+ } -+ -+ cachectl->run(); -+ -+ cleanup: -+ unregister_async_signal_handler(SIGHUP, sighup_handler); -+ unregister_async_signal_handler(SIGINT, handle_signal); -+ unregister_async_signal_handler(SIGTERM, handle_signal); -+ shutdown_async_signal_handler(); -+ -+ delete cachectl; -+ -+ return r < 0 ? EXIT_SUCCESS : EXIT_FAILURE; -+} -diff --git a/src/tools/rbd_cache/CMakeLists.txt b/src/tools/rbd_cache/CMakeLists.txt -deleted file mode 100644 -index 597d802..0000000 ---- a/src/tools/rbd_cache/CMakeLists.txt -+++ /dev/null -@@ -1,9 +0,0 @@ --add_executable(rbd-cache -- ${CMAKE_SOURCE_DIR}/src/librbd/cache/SharedPersistentObjectCacherFile.cc -- ObjectCacheStore.cc -- CacheController.cc -- main.cc) --target_link_libraries(rbd-cache -- librados -- global) --install(TARGETS rbd-cache DESTINATION bin) -diff --git a/src/tools/rbd_cache/CacheController.cc b/src/tools/rbd_cache/CacheController.cc -deleted file mode 100644 -index 620192c..0000000 ---- a/src/tools/rbd_cache/CacheController.cc -+++ /dev/null -@@ -1,116 +0,0 @@ --// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- --// vim: ts=8 sw=2 smarttab -- --#include "CacheController.h" -- --#define dout_context g_ceph_context --#define dout_subsys ceph_subsys_rbd_cache --#undef dout_prefix --#define dout_prefix *_dout << "rbd::cache::CacheController: " << this << " " \ -- << __func__ << ": " -- --namespace rbd { --namespace cache { -- --class ThreadPoolSingleton : public ThreadPool { --public: -- ContextWQ *op_work_queue; -- -- explicit ThreadPoolSingleton(CephContext *cct) -- : ThreadPool(cct, "librbd::cache::thread_pool", "tp_librbd_cache", 32, -- "pcache_threads"), -- op_work_queue(new ContextWQ("librbd::pcache_op_work_queue", -- cct->_conf.get_val<int64_t>("rbd_op_thread_timeout"), -- this)) { -- start(); -- } -- ~ThreadPoolSingleton() override { -- op_work_queue->drain(); -- delete op_work_queue; -- -- stop(); -- } --}; -- -- --CacheController::CacheController(CephContext *cct, const std::vector<const char*> &args): -- m_args(args), m_cct(cct) { -- --} -- --CacheController::~CacheController() { -- --} -- --int CacheController::init() { -- ThreadPoolSingleton* thread_pool_singleton = &m_cct->lookup_or_create_singleton_object<ThreadPoolSingleton>( -- "rbd::cache::thread_pool", false, m_cct); -- pcache_op_work_queue = thread_pool_singleton->op_work_queue; -- -- m_object_cache_store = new ObjectCacheStore(m_cct, pcache_op_work_queue); -- int r = m_object_cache_store->init(false); -- if (r < 0) { -- //derr << "init error\n" << dendl; -- } -- return r; --} -- --int CacheController::shutdown() { -- int r = m_object_cache_store->shutdown(); -- return r; --} -- --void CacheController::handle_signal(int signum){} -- --void CacheController::run() { -- try { -- //TODO(): use new socket path -- std::string controller_path = m_cct->_conf.get_val<std::string>("rbd_shared_cache_sock"); -- std::remove(controller_path.c_str()); -- -- m_cache_server = new CacheServer(controller_path, -- ([&](uint64_t p, std::string s){handle_request(p, s);}), m_cct); -- m_cache_server->run(); -- } catch (std::exception& e) { -- std::cerr << "Exception: " << e.what() << "\n"; -- } --} -- --void CacheController::handle_request(uint64_t session_id, std::string msg){ -- rbdsc_req_type_t *io_ctx = (rbdsc_req_type_t*)(msg.c_str()); -- -- int ret = 0; -- -- switch (io_ctx->type) { -- case RBDSC_REGISTER: { -- // init cache layout for volume -- m_object_cache_store->init_cache(io_ctx->vol_name, io_ctx->vol_size); -- io_ctx->type = RBDSC_REGISTER_REPLY; -- m_cache_server->send(session_id, std::string((char*)io_ctx, msg.size())); -- -- break; -- } -- case RBDSC_READ: { -- // lookup object in local cache store -- ret = m_object_cache_store->lookup_object(io_ctx->pool_name, io_ctx->vol_name); -- if (ret < 0) { -- io_ctx->type = RBDSC_READ_RADOS; -- } else { -- io_ctx->type = RBDSC_READ_REPLY; -- } -- if (io_ctx->type != RBDSC_READ_REPLY) { -- assert(0); -- } -- m_cache_server->send(session_id, std::string((char*)io_ctx, msg.size())); -- -- break; -- } -- std::cout<<"can't recongize request"<<std::endl; -- assert(0); // TODO replace it. -- } --} -- --} // namespace rbd --} // namespace cache -- -- -diff --git a/src/tools/rbd_cache/CacheController.h b/src/tools/rbd_cache/CacheController.h -deleted file mode 100644 -index 0e23484..0000000 ---- a/src/tools/rbd_cache/CacheController.h -+++ /dev/null -@@ -1,54 +0,0 @@ --#ifndef CACHE_CONTROLLER_H --#define CACHE_CONTROLLER_H -- --#include <thread> -- --#include "common/Formatter.h" --#include "common/admin_socket.h" --#include "common/debug.h" --#include "common/errno.h" --#include "common/ceph_context.h" --#include "common/Mutex.h" --#include "common/WorkQueue.h" --#include "include/rados/librados.hpp" --#include "include/rbd/librbd.h" --#include "include/assert.h" --#include "librbd/ImageCtx.h" --#include "librbd/ImageState.h" -- --#include "CacheControllerSocket.hpp" --#include "ObjectCacheStore.h" -- -- --using boost::asio::local::stream_protocol; -- --namespace rbd { --namespace cache { -- --class CacheController { -- public: -- CacheController(CephContext *cct, const std::vector<const char*> &args); -- ~CacheController(); -- -- int init(); -- -- int shutdown(); -- -- void handle_signal(int sinnum); -- -- void run(); -- -- void handle_request(uint64_t sesstion_id, std::string msg); -- -- private: -- CacheServer *m_cache_server; -- std::vector<const char*> m_args; -- CephContext *m_cct; -- ObjectCacheStore *m_object_cache_store; -- ContextWQ* pcache_op_work_queue; --}; -- --} // namespace rbd --} // namespace cache -- --#endif -diff --git a/src/tools/rbd_cache/CacheControllerSocket.hpp b/src/tools/rbd_cache/CacheControllerSocket.hpp -deleted file mode 100644 -index 2ff7477..0000000 ---- a/src/tools/rbd_cache/CacheControllerSocket.hpp -+++ /dev/null -@@ -1,228 +0,0 @@ --// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- --// vim: ts=8 sw=2 smarttab -- --#ifndef CACHE_CONTROLLER_SOCKET_H --#define CACHE_CONTROLLER_SOCKET_H -- --#include <cstdio> --#include <iostream> --#include <array> --#include <memory> --#include <string> --#include <boost/bind.hpp> --#include <boost/asio.hpp> --#include <boost/asio/error.hpp> --#include <boost/algorithm/string.hpp> --#include "CacheControllerSocketCommon.h" -- -- --using boost::asio::local::stream_protocol; -- --namespace rbd { --namespace cache { -- --class session : public std::enable_shared_from_this<session> { --public: -- session(uint64_t session_id, boost::asio::io_service& io_service, ProcessMsg processmsg) -- : m_session_id(session_id), m_dm_socket(io_service), process_msg(processmsg) {} -- -- stream_protocol::socket& socket() { -- return m_dm_socket; -- } -- -- void start() { -- if(true) { -- serial_handing_request(); -- } else { -- parallel_handing_request(); -- } -- } -- // flow: -- // -- // recv request --> process request --> reply ack -- // | | -- // --------------<------------------------- -- void serial_handing_request() { -- boost::asio::async_read(m_dm_socket, boost::asio::buffer(m_buffer, RBDSC_MSG_LEN), -- boost::asio::transfer_exactly(RBDSC_MSG_LEN), -- boost::bind(&session::handle_read, -- shared_from_this(), -- boost::asio::placeholders::error, -- boost::asio::placeholders::bytes_transferred)); -- } -- -- // flow : -- // -- // --> thread 1: process request -- // recv request --> thread 2: process request --> reply ack -- // --> thread n: process request -- // -- void parallel_handing_request() { -- // TODO -- } -- --private: -- -- void handle_read(const boost::system::error_code& error, size_t bytes_transferred) { -- // when recv eof, the most proble is that client side close socket. -- // so, server side need to end handing_request -- if(error == boost::asio::error::eof) { -- std::cout<<"session: async_read : " << error.message() << std::endl; -- return; -- } -- -- if(error) { -- std::cout<<"session: async_read fails: " << error.message() << std::endl; -- assert(0); -- } -- -- if(bytes_transferred != RBDSC_MSG_LEN) { -- std::cout<<"session : request in-complete. "<<std::endl; -- assert(0); -- } -- -- // TODO async_process can increse coding readable. -- // process_msg_callback call handle async_send -- process_msg(m_session_id, std::string(m_buffer, bytes_transferred)); -- } -- -- void handle_write(const boost::system::error_code& error, size_t bytes_transferred) { -- if (error) { -- std::cout<<"session: async_write fails: " << error.message() << std::endl; -- assert(0); -- } -- -- if(bytes_transferred != RBDSC_MSG_LEN) { -- std::cout<<"session : reply in-complete. "<<std::endl; -- assert(0); -- } -- -- boost::asio::async_read(m_dm_socket, boost::asio::buffer(m_buffer), -- boost::asio::transfer_exactly(RBDSC_MSG_LEN), -- boost::bind(&session::handle_read, -- shared_from_this(), -- boost::asio::placeholders::error, -- boost::asio::placeholders::bytes_transferred)); -- -- } -- --public: -- void send(std::string msg) { -- boost::asio::async_write(m_dm_socket, -- boost::asio::buffer(msg.c_str(), msg.size()), -- boost::asio::transfer_exactly(RBDSC_MSG_LEN), -- boost::bind(&session::handle_write, -- shared_from_this(), -- boost::asio::placeholders::error, -- boost::asio::placeholders::bytes_transferred)); -- -- } -- --private: -- uint64_t m_session_id; -- stream_protocol::socket m_dm_socket; -- ProcessMsg process_msg; -- -- // Buffer used to store data received from the client. -- //std::array<char, 1024> data_; -- char m_buffer[1024]; --}; -- --typedef std::shared_ptr<session> session_ptr; -- --class CacheServer { --public: -- CacheServer(const std::string& file, ProcessMsg processmsg, CephContext* cct) -- : m_cct(cct), m_server_process_msg(processmsg), -- m_local_path(file), -- m_acceptor(m_io_service) -- {} -- -- void run() { -- bool ret; -- ret = start_accept(); -- if(!ret) { -- return; -- } -- m_io_service.run(); -- } -- -- // TODO : use callback to replace this function. -- void send(uint64_t session_id, std::string msg) { -- auto it = m_session_map.find(session_id); -- if (it != m_session_map.end()) { -- it->second->send(msg); -- } else { -- // TODO : why don't find existing session id ? -- std::cout<<"don't find session id..."<<std::endl; -- assert(0); -- } -- } -- --private: -- // when creating one acceptor, can control every step in this way. -- bool start_accept() { -- boost::system::error_code ec; -- m_acceptor.open(m_local_path.protocol(), ec); -- if(ec) { -- std::cout << "m_acceptor open fails: " << ec.message() << std::endl; -- return false; -- } -- -- // TODO control acceptor attribute. -- -- m_acceptor.bind(m_local_path, ec); -- if(ec) { -- std::cout << "m_acceptor bind fails: " << ec.message() << std::endl; -- return false; -- } -- -- m_acceptor.listen(boost::asio::socket_base::max_connections, ec); -- if(ec) { -- std::cout << "m_acceptor listen fails: " << ec.message() << std::endl; -- return false; -- } -- -- accept(); -- return true; -- } -- -- void accept() { -- session_ptr new_session(new session(m_session_id, m_io_service, m_server_process_msg)); -- m_acceptor.async_accept(new_session->socket(), -- boost::bind(&CacheServer::handle_accept, this, new_session, -- boost::asio::placeholders::error)); -- } -- -- void handle_accept(session_ptr new_session, const boost::system::error_code& error) { -- //TODO(): open librbd snap ... yuan -- -- if(error) { -- std::cout << "async accept fails : " << error.message() << std::endl; -- assert(0); // TODO -- } -- -- // must put session into m_session_map at the front of session.start() -- m_session_map.emplace(m_session_id, new_session); -- // TODO : session setting -- new_session->start(); -- m_session_id++; -- -- // lanuch next accept -- accept(); -- } -- --private: -- CephContext* m_cct; -- boost::asio::io_service m_io_service; // TODO wrapper it. -- ProcessMsg m_server_process_msg; -- stream_protocol::endpoint m_local_path; -- stream_protocol::acceptor m_acceptor; -- uint64_t m_session_id = 1; -- std::map<uint64_t, session_ptr> m_session_map; --}; -- --} // namespace cache --} // namespace rbd -- --#endif -diff --git a/src/tools/rbd_cache/CacheControllerSocketClient.hpp b/src/tools/rbd_cache/CacheControllerSocketClient.hpp -deleted file mode 100644 -index 964f888..0000000 ---- a/src/tools/rbd_cache/CacheControllerSocketClient.hpp -+++ /dev/null -@@ -1,229 +0,0 @@ --// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- --// vim: ts=8 sw=2 smarttab -- --#ifndef CACHE_CONTROLLER_SOCKET_CLIENT_H --#define CACHE_CONTROLLER_SOCKET_CLIENT_H -- --#include <atomic> --#include <boost/asio.hpp> --#include <boost/bind.hpp> --#include <boost/asio/error.hpp> --#include <boost/algorithm/string.hpp> --#include "librbd/ImageCtx.h" --#include "include/assert.h" --#include "include/Context.h" --#include "CacheControllerSocketCommon.h" -- -- --using boost::asio::local::stream_protocol; -- --namespace rbd { --namespace cache { -- --class CacheClient { --public: -- CacheClient(const std::string& file, ClientProcessMsg processmsg, CephContext* ceph_ctx) -- : m_io_service_work(m_io_service), -- m_dm_socket(m_io_service), -- m_client_process_msg(processmsg), -- m_ep(stream_protocol::endpoint(file)), -- m_session_work(false), -- cct(ceph_ctx) -- { -- // TODO wrapper io_service -- std::thread thd([this](){ -- m_io_service.run();}); -- thd.detach(); -- } -- -- void run(){ -- } -- -- bool is_session_work() { -- return m_session_work.load() == true; -- } -- -- // just when error occur, call this method. -- void close() { -- m_session_work.store(false); -- boost::system::error_code close_ec; -- m_dm_socket.close(close_ec); -- if(close_ec) { -- std::cout << "close: " << close_ec.message() << std::endl; -- } -- std::cout << "session don't work, later all request will be dispatched to rados layer" << std::endl; -- } -- -- int connect() { -- boost::system::error_code ec; -- m_dm_socket.connect(m_ep, ec); -- if(ec) { -- if(ec == boost::asio::error::connection_refused) { -- std::cout << ec.message() << " : maybe rbd-cache Controller don't startup. " -- << "Now data will be read from ceph cluster " << std::endl; -- } else { -- std::cout << "connect: " << ec.message() << std::endl; -- } -- -- if(m_dm_socket.is_open()) { -- // Set to indicate what error occurred, if any. -- // Note that, even if the function indicates an error, -- // the underlying descriptor is closed. -- boost::system::error_code close_ec; -- m_dm_socket.close(close_ec); -- if(close_ec) { -- std::cout << "close: " << close_ec.message() << std::endl; -- } -- } -- return -1; -- } -- -- std::cout<<"connect success"<<std::endl; -- -- return 0; -- } -- -- int register_volume(std::string pool_name, std::string vol_name, uint64_t vol_size) { -- // cache controller will init layout -- rbdsc_req_type_t *message = new rbdsc_req_type_t(); -- message->type = RBDSC_REGISTER; -- memcpy(message->pool_name, pool_name.c_str(), pool_name.size()); -- memcpy(message->vol_name, vol_name.c_str(), vol_name.size()); -- message->vol_size = vol_size; -- message->offset = 0; -- message->length = 0; -- -- uint64_t ret; -- boost::system::error_code ec; -- -- ret = boost::asio::write(m_dm_socket, boost::asio::buffer((char*)message, message->size()), ec); -- if(ec) { -- std::cout << "write fails : " << ec.message() << std::endl; -- return -1; -- } -- -- if(ret != message->size()) { -- std::cout << "write fails : ret != send_bytes "<< std::endl; -- return -1; -- } -- -- // hard code TODO -- ret = boost::asio::read(m_dm_socket, boost::asio::buffer(m_recv_buffer, RBDSC_MSG_LEN), ec); -- if(ec == boost::asio::error::eof) { -- std::cout<< "recv eof"<<std::endl; -- return -1; -- } -- -- if(ec) { -- std::cout << "write fails : " << ec.message() << std::endl; -- return -1; -- } -- -- if(ret != RBDSC_MSG_LEN) { -- std::cout << "write fails : ret != receive bytes " << std::endl; -- return -1; -- } -- -- m_client_process_msg(std::string(m_recv_buffer, ret)); -- -- delete message; -- -- std::cout << "register volume success" << std::endl; -- -- // TODO -- m_session_work.store(true); -- -- return 0; -- } -- -- // if occur any error, we just return false. Then read from rados. -- int lookup_object(std::string pool_name, std::string vol_name, std::string object_id, Context* on_finish) { -- rbdsc_req_type_t *message = new rbdsc_req_type_t(); -- message->type = RBDSC_READ; -- memcpy(message->pool_name, pool_name.c_str(), pool_name.size()); -- memcpy(message->vol_name, object_id.c_str(), object_id.size()); -- message->vol_size = 0; -- message->offset = 0; -- message->length = 0; -- -- boost::asio::async_write(m_dm_socket, -- boost::asio::buffer((char*)message, message->size()), -- boost::asio::transfer_exactly(RBDSC_MSG_LEN), -- [this, on_finish, message](const boost::system::error_code& err, size_t cb) { -- delete message; -- if(err) { -- std::cout<< "lookup_object: async_write fails." << err.message() << std::endl; -- close(); -- on_finish->complete(false); -- return; -- } -- if(cb != RBDSC_MSG_LEN) { -- std::cout<< "lookup_object: async_write fails. in-complete request" <<std::endl; -- close(); -- on_finish->complete(false); -- return; -- } -- get_result(on_finish); -- }); -- -- return 0; -- } -- -- void get_result(Context* on_finish) { -- boost::asio::async_read(m_dm_socket, boost::asio::buffer(m_recv_buffer, RBDSC_MSG_LEN), -- boost::asio::transfer_exactly(RBDSC_MSG_LEN), -- [this, on_finish](const boost::system::error_code& err, size_t cb) { -- if(err == boost::asio::error::eof) { -- std::cout<<"get_result: ack is EOF." << std::endl; -- close(); -- on_finish->complete(false); -- return; -- } -- if(err) { -- std::cout<< "get_result: async_read fails:" << err.message() << std::endl; -- close(); -- on_finish->complete(false); // TODO replace this assert with some metohds. -- return; -- } -- if (cb != RBDSC_MSG_LEN) { -- close(); -- std::cout << "get_result: in-complete ack." << std::endl; -- on_finish->complete(false); // TODO: replace this assert with some methods. -- } -- -- rbdsc_req_type_t *io_ctx = (rbdsc_req_type_t*)(m_recv_buffer); -- -- // TODO: re-occur yuan's bug -- if(io_ctx->type == RBDSC_READ) { -- std::cout << "get rbdsc_read... " << std::endl; -- assert(0); -- } -- -- if (io_ctx->type == RBDSC_READ_REPLY) { -- on_finish->complete(true); -- return; -- } else { -- on_finish->complete(false); -- return; -- } -- }); -- } -- --private: -- boost::asio::io_service m_io_service; -- boost::asio::io_service::work m_io_service_work; -- stream_protocol::socket m_dm_socket; -- ClientProcessMsg m_client_process_msg; -- stream_protocol::endpoint m_ep; -- char m_recv_buffer[1024]; -- -- // atomic modfiy for this variable. -- // thread 1 : asio callback thread modify it. -- // thread 2 : librbd read it. -- std::atomic<bool> m_session_work; -- CephContext* cct; --}; -- --} // namespace cache --} // namespace rbd --#endif -diff --git a/src/tools/rbd_cache/CacheControllerSocketCommon.h b/src/tools/rbd_cache/CacheControllerSocketCommon.h -deleted file mode 100644 -index e17529a..0000000 ---- a/src/tools/rbd_cache/CacheControllerSocketCommon.h -+++ /dev/null -@@ -1,62 +0,0 @@ --// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- --// vim: ts=8 sw=2 smarttab -- --#ifndef CACHE_CONTROLLER_SOCKET_COMMON_H --#define CACHE_CONTROLLER_SOCKET_COMMON_H -- --/* --#define RBDSC_REGISTER 0X11 --#define RBDSC_READ 0X12 --#define RBDSC_LOOKUP 0X13 --#define RBDSC_REGISTER_REPLY 0X14 --#define RBDSC_READ_REPLY 0X15 --#define RBDSC_LOOKUP_REPLY 0X16 --#define RBDSC_READ_RADOS 0X17 --*/ -- --namespace rbd { --namespace cache { -- --static const int RBDSC_REGISTER = 0X11; --static const int RBDSC_READ = 0X12; --static const int RBDSC_LOOKUP = 0X13; --static const int RBDSC_REGISTER_REPLY = 0X14; --static const int RBDSC_READ_REPLY = 0X15; --static const int RBDSC_LOOKUP_REPLY = 0X16; --static const int RBDSC_READ_RADOS = 0X17; -- -- -- --typedef std::function<void(uint64_t, std::string)> ProcessMsg; --typedef std::function<void(std::string)> ClientProcessMsg; --typedef uint8_t rbdsc_req_type; --struct rbdsc_req_type_t { -- rbdsc_req_type type; -- uint64_t vol_size; -- uint64_t offset; -- uint64_t length; -- char pool_name[256]; -- char vol_name[256]; -- -- uint64_t size() { -- return sizeof(rbdsc_req_type_t); -- } -- -- std::string to_buffer() { -- std::stringstream ss; -- ss << type; -- ss << vol_size; -- ss << offset; -- ss << length; -- ss << pool_name; -- ss << vol_name; -- -- return ss.str(); -- } --}; -- --static const int RBDSC_MSG_LEN = sizeof(rbdsc_req_type_t); -- --} // namespace cache --} // namespace rbd --#endif -diff --git a/src/tools/rbd_cache/ObjectCacheStore.cc b/src/tools/rbd_cache/ObjectCacheStore.cc -deleted file mode 100644 -index 99f90d6..0000000 ---- a/src/tools/rbd_cache/ObjectCacheStore.cc -+++ /dev/null -@@ -1,172 +0,0 @@ --// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- --// vim: ts=8 sw=2 smarttab -- --#include "ObjectCacheStore.h" -- --#define dout_context g_ceph_context --#define dout_subsys ceph_subsys_rbd_cache --#undef dout_prefix --#define dout_prefix *_dout << "rbd::cache::ObjectCacheStore: " << this << " " \ -- << __func__ << ": " -- --namespace rbd { --namespace cache { -- --ObjectCacheStore::ObjectCacheStore(CephContext *cct, ContextWQ* work_queue) -- : m_cct(cct), m_work_queue(work_queue), -- m_rados(new librados::Rados()) { -- -- uint64_t object_cache_entries = -- cct->_conf.get_val<int64_t>("rbd_shared_cache_entries"); -- -- //TODO(): allow to set level -- m_policy = new SimplePolicy(object_cache_entries, 0.5); --} -- --ObjectCacheStore::~ObjectCacheStore() { -- delete m_policy; --} -- --int ObjectCacheStore::init(bool reset) { -- -- int ret = m_rados->init_with_context(m_cct); -- if(ret < 0) { -- lderr(m_cct) << "fail to init Ceph context" << dendl; -- return ret; -- } -- -- ret = m_rados->connect(); -- if(ret < 0 ) { -- lderr(m_cct) << "fail to conect to cluster" << dendl; -- return ret; -- } -- -- std::string cache_path = m_cct->_conf.get_val<std::string>("rbd_shared_cache_path"); -- //TODO(): check and reuse existing cache objects -- if(reset) { -- std::string cmd = "exec rm -rf " + cache_path + "/rbd_cache*; exec mkdir -p " + cache_path; -- //TODO(): to use std::filesystem -- int r = system(cmd.c_str()); -- } -- -- evict_thd = new std::thread([this]{this->evict_thread_body();}); -- return ret; --} -- --int ObjectCacheStore::do_promote(std::string pool_name, std::string object_name) { -- int ret = 0; -- std::string cache_file_name = pool_name + object_name; -- -- //TODO(): lock on ioctx map -- if (m_ioctxs.find(pool_name) == m_ioctxs.end()) { -- librados::IoCtx* io_ctx = new librados::IoCtx(); -- ret = m_rados->ioctx_create(pool_name.c_str(), *io_ctx); -- if (ret < 0) { -- lderr(m_cct) << "fail to create ioctx" << dendl; -- assert(0); -- } -- m_ioctxs.emplace(pool_name, io_ctx); -- } -- -- assert(m_ioctxs.find(pool_name) != m_ioctxs.end()); -- -- librados::IoCtx* ioctx = m_ioctxs[pool_name]; -- -- librados::bufferlist* read_buf = new librados::bufferlist(); -- int object_size = 4096*1024; //TODO(): read config from image metadata -- -- //TODO(): async promote -- ret = promote_object(ioctx, object_name, read_buf, object_size); -- if (ret == -ENOENT) { -- read_buf->append(std::string(object_size, '0')); -- ret = 0; -- } -- -- if( ret < 0) { -- lderr(m_cct) << "fail to read from rados" << dendl; -- return ret; -- } -- -- // persistent to cache -- librbd::cache::SyncFile cache_file(m_cct, cache_file_name); -- cache_file.open(); -- ret = cache_file.write_object_to_file(*read_buf, object_size); -- -- // update metadata -- assert(OBJ_CACHE_PROMOTING == m_policy->get_status(cache_file_name)); -- m_policy->update_status(cache_file_name, OBJ_CACHE_PROMOTED); -- assert(OBJ_CACHE_PROMOTED == m_policy->get_status(cache_file_name)); -- -- return ret; -- --} -- --// return -1, client need to read data from cluster. --// return 0, client directly read data from cache. --int ObjectCacheStore::lookup_object(std::string pool_name, std::string object_name) { -- -- std::string cache_file_name = pool_name + object_name; -- -- CACHESTATUS ret; -- ret = m_policy->lookup_object(cache_file_name); -- -- switch(ret) { -- case OBJ_CACHE_NONE: -- return do_promote(pool_name, object_name); -- case OBJ_CACHE_PROMOTED: -- return 0; -- case OBJ_CACHE_PROMOTING: -- default: -- return -1; -- } --} -- --void ObjectCacheStore::evict_thread_body() { -- int ret; -- while(m_evict_go) { -- ret = evict_objects(); -- } --} -- -- --int ObjectCacheStore::shutdown() { -- m_evict_go = false; -- evict_thd->join(); -- m_rados->shutdown(); -- return 0; --} -- --int ObjectCacheStore::init_cache(std::string vol_name, uint64_t vol_size) { -- return 0; --} -- --int ObjectCacheStore::lock_cache(std::string vol_name) { -- return 0; --} -- --int ObjectCacheStore::promote_object(librados::IoCtx* ioctx, std::string object_name, librados::bufferlist* read_buf, uint64_t read_len) { -- int ret; -- -- librados::AioCompletion* read_completion = librados::Rados::aio_create_completion(); -- -- ret = ioctx->aio_read(object_name, read_completion, read_buf, read_len, 0); -- if(ret < 0) { -- lderr(m_cct) << "fail to read from rados" << dendl; -- return ret; -- } -- read_completion->wait_for_complete(); -- ret = read_completion->get_return_value(); -- return ret; -- --} -- --int ObjectCacheStore::evict_objects() { -- std::list<std::string> obj_list; -- m_policy->get_evict_list(&obj_list); -- for (auto& obj: obj_list) { -- //do_evict(obj); -- } --} -- --} // namespace cache --} // namespace rbd -diff --git a/src/tools/rbd_cache/ObjectCacheStore.h b/src/tools/rbd_cache/ObjectCacheStore.h -deleted file mode 100644 -index ba0e1f1..0000000 ---- a/src/tools/rbd_cache/ObjectCacheStore.h -+++ /dev/null -@@ -1,70 +0,0 @@ --// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- --// vim: ts=8 sw=2 smarttab -- --#ifndef OBJECT_CACHE_STORE_H --#define OBJECT_CACHE_STORE_H -- --#include "common/debug.h" --#include "common/errno.h" --#include "common/ceph_context.h" --#include "common/Mutex.h" --#include "include/rados/librados.hpp" --#include "include/rbd/librbd.h" --#include "librbd/ImageCtx.h" --#include "librbd/ImageState.h" --#include "librbd/cache/SharedPersistentObjectCacherFile.h" --#include "SimplePolicy.hpp" -- -- --using librados::Rados; --using librados::IoCtx; -- --namespace rbd { --namespace cache { -- --typedef shared_ptr<librados::Rados> RadosRef; --typedef shared_ptr<librados::IoCtx> IoCtxRef; -- --class ObjectCacheStore --{ -- public: -- ObjectCacheStore(CephContext *cct, ContextWQ* work_queue); -- ~ObjectCacheStore(); -- -- int init(bool reset); -- -- int shutdown(); -- -- int lookup_object(std::string pool_name, std::string object_name); -- -- int init_cache(std::string vol_name, uint64_t vol_size); -- -- int lock_cache(std::string vol_name); -- -- private: -- void evict_thread_body(); -- int evict_objects(); -- -- int do_promote(std::string pool_name, std::string object_name); -- -- int promote_object(librados::IoCtx*, std::string object_name, -- librados::bufferlist* read_buf, -- uint64_t length); -- -- CephContext *m_cct; -- ContextWQ* m_work_queue; -- RadosRef m_rados; -- -- -- std::map<std::string, librados::IoCtx*> m_ioctxs; -- -- librbd::cache::SyncFile *m_cache_file; -- -- Policy* m_policy; -- std::thread* evict_thd; -- bool m_evict_go = false; --}; -- --} // namespace rbd --} // namespace cache --#endif -diff --git a/src/tools/rbd_cache/Policy.hpp b/src/tools/rbd_cache/Policy.hpp -deleted file mode 100644 -index 711e3bd..0000000 ---- a/src/tools/rbd_cache/Policy.hpp -+++ /dev/null -@@ -1,30 +0,0 @@ --#ifndef RBD_CACHE_POLICY_HPP --#define RBD_CACHE_POLICY_HPP -- --#include <list> --#include <string> -- --namespace rbd { --namespace cache { -- --enum CACHESTATUS { -- OBJ_CACHE_NONE = 0, -- OBJ_CACHE_PROMOTING, -- OBJ_CACHE_PROMOTED, --}; -- -- --class Policy { --public: -- Policy(){} -- virtual ~Policy(){}; -- virtual CACHESTATUS lookup_object(std::string) = 0; -- virtual int evict_object(std::string&) = 0; -- virtual void update_status(std::string, CACHESTATUS) = 0; -- virtual CACHESTATUS get_status(std::string) = 0; -- virtual void get_evict_list(std::list<std::string>* obj_list) = 0; --}; -- --} // namespace cache --} // namespace rbd --#endif -diff --git a/src/tools/rbd_cache/SimplePolicy.hpp b/src/tools/rbd_cache/SimplePolicy.hpp -deleted file mode 100644 -index e785de1..0000000 ---- a/src/tools/rbd_cache/SimplePolicy.hpp -+++ /dev/null -@@ -1,160 +0,0 @@ --#ifndef RBD_CACHE_SIMPLE_POLICY_HPP --#define RBD_CACHE_SIMPLE_POLICY_HPP -- --#include "Policy.hpp" --#include "include/lru.h" --#include "common/RWLock.h" --#include "common/Mutex.h" -- --#include <vector> --#include <unordered_map> --#include <string> -- --namespace rbd { --namespace cache { -- -- --class SimplePolicy : public Policy { --public: -- SimplePolicy(uint64_t block_num, float watermark) -- : m_watermark(watermark), m_entry_count(block_num), -- m_cache_map_lock("rbd::cache::SimplePolicy::m_cache_map_lock"), -- m_free_list_lock("rbd::cache::SimplePolicy::m_free_list_lock") -- { -- -- for(uint64_t i = 0; i < m_entry_count; i++) { -- m_free_list.push_back(new Entry()); -- } -- -- } -- -- ~SimplePolicy() { -- for(uint64_t i = 0; i < m_entry_count; i++) { -- Entry* entry = reinterpret_cast<Entry*>(m_free_list.front()); -- delete entry; -- m_free_list.pop_front(); -- } -- } -- -- CACHESTATUS lookup_object(std::string cache_file_name) { -- -- //TODO(): check race condition -- RWLock::WLocker wlocker(m_cache_map_lock); -- -- auto entry_it = m_cache_map.find(cache_file_name); -- if(entry_it == m_cache_map.end()) { -- Mutex::Locker locker(m_free_list_lock); -- Entry* entry = reinterpret_cast<Entry*>(m_free_list.front()); -- assert(entry != nullptr); -- m_free_list.pop_front(); -- entry->status = OBJ_CACHE_PROMOTING; -- -- m_cache_map[cache_file_name] = entry; -- -- return OBJ_CACHE_NONE; -- } -- -- Entry* entry = entry_it->second; -- -- if(entry->status == OBJ_CACHE_PROMOTED) { -- // touch it -- m_promoted_lru.lru_touch(entry); -- } -- -- return entry->status; -- } -- -- int evict_object(std::string& out_cache_file_name) { -- RWLock::WLocker locker(m_cache_map_lock); -- -- return 1; -- } -- -- // TODO(): simplify the logic -- void update_status(std::string file_name, CACHESTATUS new_status) { -- RWLock::WLocker locker(m_cache_map_lock); -- -- Entry* entry; -- auto entry_it = m_cache_map.find(file_name); -- -- // just check. -- if(new_status == OBJ_CACHE_PROMOTING) { -- assert(entry_it == m_cache_map.end()); -- } -- -- assert(entry_it != m_cache_map.end()); -- -- entry = entry_it->second; -- -- // promoting is done, so update it. -- if(entry->status == OBJ_CACHE_PROMOTING && new_status== OBJ_CACHE_PROMOTED) { -- m_promoted_lru.lru_insert_top(entry); -- entry->status = new_status; -- return; -- } -- -- assert(0); -- } -- -- // get entry status -- CACHESTATUS get_status(std::string file_name) { -- RWLock::RLocker locker(m_cache_map_lock); -- auto entry_it = m_cache_map.find(file_name); -- if(entry_it == m_cache_map.end()) { -- return OBJ_CACHE_NONE; -- } -- -- return entry_it->second->status; -- } -- -- void get_evict_list(std::list<std::string>* obj_list) { -- RWLock::WLocker locker(m_cache_map_lock); -- // check free ratio, pop entries from LRU -- if (m_free_list.size() / m_entry_count < m_watermark) { -- int evict_num = 10; //TODO(): make this configurable -- for(int i = 0; i < evict_num; i++) { -- Entry* entry = reinterpret_cast<Entry*>(m_promoted_lru.lru_expire()); -- if (entry == nullptr) { -- continue; -- } -- std::string file_name = entry->cache_file_name; -- obj_list->push_back(file_name); -- -- auto entry_it = m_cache_map.find(file_name); -- m_cache_map.erase(entry_it); -- -- //mark this entry as free -- entry->status = OBJ_CACHE_NONE; -- Mutex::Locker locker(m_free_list_lock); -- m_free_list.push_back(entry); -- } -- } -- } -- --private: -- -- class Entry : public LRUObject { -- public: -- CACHESTATUS status; -- Entry() : status(OBJ_CACHE_NONE){} -- std::string cache_file_name; -- void encode(bufferlist &bl){} -- void decode(bufferlist::iterator &it){} -- }; -- -- float m_watermark; -- uint64_t m_entry_count; -- -- std::unordered_map<std::string, Entry*> m_cache_map; -- RWLock m_cache_map_lock; -- -- std::deque<Entry*> m_free_list; -- Mutex m_free_list_lock; -- -- LRU m_promoted_lru; // include promoted, using status. -- --}; -- --} // namespace cache --} // namespace rbd --#endif -diff --git a/src/tools/rbd_cache/main.cc b/src/tools/rbd_cache/main.cc -deleted file mode 100644 -index d604760..0000000 ---- a/src/tools/rbd_cache/main.cc -+++ /dev/null -@@ -1,85 +0,0 @@ --// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- --// vim: ts=8 sw=2 smarttab -- --#include "common/ceph_argparse.h" --#include "common/config.h" --#include "common/debug.h" --#include "common/errno.h" --#include "global/global_init.h" --#include "global/signal_handler.h" --#include "CacheController.h" -- --#include <vector> -- --rbd::cache::CacheController *cachectl = nullptr; -- --void usage() { -- std::cout << "usage: cache controller [options...]" << std::endl; -- std::cout << "options:\n"; -- std::cout << " -m monaddress[:port] connect to specified monitor\n"; -- std::cout << " --keyring=<path> path to keyring for local cluster\n"; -- std::cout << " --log-file=<logfile> file to log debug output\n"; -- std::cout << " --debug-rbd-cachecontroller=<log-level>/<memory-level> set rbd-mirror debug level\n"; -- generic_server_usage(); --} -- --static void handle_signal(int signum) --{ -- if (cachectl) -- cachectl->handle_signal(signum); --} -- --int main(int argc, const char **argv) --{ -- std::vector<const char*> args; -- env_to_vec(args); -- argv_to_vec(argc, argv, args); -- -- auto cct = global_init(nullptr, args, CEPH_ENTITY_TYPE_CLIENT, -- CODE_ENVIRONMENT_DAEMON, -- CINIT_FLAG_UNPRIVILEGED_DAEMON_DEFAULTS); -- -- for (auto i = args.begin(); i != args.end(); ++i) { -- if (ceph_argparse_flag(args, i, "-h", "--help", (char*)NULL)) { -- usage(); -- return EXIT_SUCCESS; -- } -- } -- -- if (g_conf()->daemonize) { -- global_init_daemonize(g_ceph_context); -- } -- g_ceph_context->enable_perf_counter(); -- -- common_init_finish(g_ceph_context); -- -- init_async_signal_handler(); -- register_async_signal_handler(SIGHUP, sighup_handler); -- register_async_signal_handler_oneshot(SIGINT, handle_signal); -- register_async_signal_handler_oneshot(SIGTERM, handle_signal); -- -- std::vector<const char*> cmd_args; -- argv_to_vec(argc, argv, cmd_args); -- -- // disable unnecessary librbd cache -- g_ceph_context->_conf.set_val_or_die("rbd_cache", "false"); -- -- cachectl = new rbd::cache::CacheController(g_ceph_context, cmd_args); -- int r = cachectl->init(); -- if (r < 0) { -- std::cerr << "failed to initialize: " << cpp_strerror(r) << std::endl; -- goto cleanup; -- } -- -- cachectl->run(); -- -- cleanup: -- unregister_async_signal_handler(SIGHUP, sighup_handler); -- unregister_async_signal_handler(SIGINT, handle_signal); -- unregister_async_signal_handler(SIGTERM, handle_signal); -- shutdown_async_signal_handler(); -- -- delete cachectl; -- -- return r < 0 ? EXIT_SUCCESS : EXIT_FAILURE; --} --- -2.7.4 - |