summaryrefslogtreecommitdiffstats
path: root/src/ceph/0010-librbd-new-namespace-ceph-immutable-obj-cache.patch
diff options
context:
space:
mode:
Diffstat (limited to 'src/ceph/0010-librbd-new-namespace-ceph-immutable-obj-cache.patch')
-rw-r--r--src/ceph/0010-librbd-new-namespace-ceph-immutable-obj-cache.patch3510
1 files changed, 0 insertions, 3510 deletions
diff --git a/src/ceph/0010-librbd-new-namespace-ceph-immutable-obj-cache.patch b/src/ceph/0010-librbd-new-namespace-ceph-immutable-obj-cache.patch
deleted file mode 100644
index 46040e6..0000000
--- a/src/ceph/0010-librbd-new-namespace-ceph-immutable-obj-cache.patch
+++ /dev/null
@@ -1,3510 +0,0 @@
-From 9a097084d06e7186206b43d9c81d1f648791d7a4 Mon Sep 17 00:00:00 2001
-From: Yuan Zhou <yuan.zhou@intel.com>
-Date: Fri, 7 Sep 2018 08:29:51 +0800
-Subject: [PATCH 10/10] librbd: new namespace ceph immutable obj cache
-
-clean up class/func names to use the new namespace
-
-Signed-off-by: Yuan Zhou <yuan.zhou@intel.com>
----
- src/common/options.cc | 2 +-
- src/common/subsys.h | 3 +-
- src/librbd/CMakeLists.txt | 3 +-
- .../SharedPersistentObjectCacherObjectDispatch.cc | 175 ----------------
- .../SharedPersistentObjectCacherObjectDispatch.h | 133 ------------
- src/librbd/cache/SharedReadOnlyObjectDispatch.cc | 170 +++++++++++++++
- src/librbd/cache/SharedReadOnlyObjectDispatch.h | 126 ++++++++++++
- src/librbd/image/OpenRequest.cc | 4 +-
- src/tools/CMakeLists.txt | 2 +-
- .../ceph_immutable_object_cache/CMakeLists.txt | 11 +
- .../ceph_immutable_object_cache/CacheClient.cc | 205 ++++++++++++++++++
- .../ceph_immutable_object_cache/CacheClient.h | 53 +++++
- .../ceph_immutable_object_cache/CacheController.cc | 117 +++++++++++
- .../ceph_immutable_object_cache/CacheController.h | 53 +++++
- .../ceph_immutable_object_cache/CacheServer.cc | 99 +++++++++
- .../ceph_immutable_object_cache/CacheServer.h | 54 +++++
- .../ceph_immutable_object_cache/CacheSession.cc | 115 +++++++++++
- .../ceph_immutable_object_cache/CacheSession.h | 58 ++++++
- .../ObjectCacheStore.cc | 172 ++++++++++++++++
- .../ceph_immutable_object_cache/ObjectCacheStore.h | 70 +++++++
- src/tools/ceph_immutable_object_cache/Policy.hpp | 33 +++
- .../ceph_immutable_object_cache/SimplePolicy.hpp | 163 +++++++++++++++
- .../ceph_immutable_object_cache/SocketCommon.h | 54 +++++
- src/tools/ceph_immutable_object_cache/main.cc | 85 ++++++++
- src/tools/rbd_cache/CMakeLists.txt | 9 -
- src/tools/rbd_cache/CacheController.cc | 116 -----------
- src/tools/rbd_cache/CacheController.h | 54 -----
- src/tools/rbd_cache/CacheControllerSocket.hpp | 228 --------------------
- .../rbd_cache/CacheControllerSocketClient.hpp | 229 ---------------------
- src/tools/rbd_cache/CacheControllerSocketCommon.h | 62 ------
- src/tools/rbd_cache/ObjectCacheStore.cc | 172 ----------------
- src/tools/rbd_cache/ObjectCacheStore.h | 70 -------
- src/tools/rbd_cache/Policy.hpp | 30 ---
- src/tools/rbd_cache/SimplePolicy.hpp | 160 --------------
- src/tools/rbd_cache/main.cc | 85 --------
- 35 files changed, 1646 insertions(+), 1529 deletions(-)
- delete mode 100644 src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc
- delete mode 100644 src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.h
- create mode 100644 src/librbd/cache/SharedReadOnlyObjectDispatch.cc
- create mode 100644 src/librbd/cache/SharedReadOnlyObjectDispatch.h
- create mode 100644 src/tools/ceph_immutable_object_cache/CMakeLists.txt
- create mode 100644 src/tools/ceph_immutable_object_cache/CacheClient.cc
- create mode 100644 src/tools/ceph_immutable_object_cache/CacheClient.h
- create mode 100644 src/tools/ceph_immutable_object_cache/CacheController.cc
- create mode 100644 src/tools/ceph_immutable_object_cache/CacheController.h
- create mode 100644 src/tools/ceph_immutable_object_cache/CacheServer.cc
- create mode 100644 src/tools/ceph_immutable_object_cache/CacheServer.h
- create mode 100644 src/tools/ceph_immutable_object_cache/CacheSession.cc
- create mode 100644 src/tools/ceph_immutable_object_cache/CacheSession.h
- create mode 100644 src/tools/ceph_immutable_object_cache/ObjectCacheStore.cc
- create mode 100644 src/tools/ceph_immutable_object_cache/ObjectCacheStore.h
- create mode 100644 src/tools/ceph_immutable_object_cache/Policy.hpp
- create mode 100644 src/tools/ceph_immutable_object_cache/SimplePolicy.hpp
- create mode 100644 src/tools/ceph_immutable_object_cache/SocketCommon.h
- create mode 100644 src/tools/ceph_immutable_object_cache/main.cc
- delete mode 100644 src/tools/rbd_cache/CMakeLists.txt
- delete mode 100644 src/tools/rbd_cache/CacheController.cc
- delete mode 100644 src/tools/rbd_cache/CacheController.h
- delete mode 100644 src/tools/rbd_cache/CacheControllerSocket.hpp
- delete mode 100644 src/tools/rbd_cache/CacheControllerSocketClient.hpp
- delete mode 100644 src/tools/rbd_cache/CacheControllerSocketCommon.h
- delete mode 100644 src/tools/rbd_cache/ObjectCacheStore.cc
- delete mode 100644 src/tools/rbd_cache/ObjectCacheStore.h
- delete mode 100644 src/tools/rbd_cache/Policy.hpp
- delete mode 100644 src/tools/rbd_cache/SimplePolicy.hpp
- delete mode 100644 src/tools/rbd_cache/main.cc
-
-diff --git a/src/common/options.cc b/src/common/options.cc
-index 3172744..bf00aab 100644
---- a/src/common/options.cc
-+++ b/src/common/options.cc
-@@ -6358,7 +6358,7 @@ static std::vector<Option> get_rbd_options() {
- .set_description("time in seconds for detecting a hung thread"),
-
- Option("rbd_shared_cache_enabled", Option::TYPE_BOOL, Option::LEVEL_ADVANCED)
-- .set_default(true)
-+ .set_default(false)
- .set_description("whether to enable shared ssd caching"),
-
- Option("rbd_shared_cache_path", Option::TYPE_STR, Option::LEVEL_ADVANCED)
-diff --git a/src/common/subsys.h b/src/common/subsys.h
-index bdd2d0e..5b532c1 100644
---- a/src/common/subsys.h
-+++ b/src/common/subsys.h
-@@ -36,9 +36,10 @@ SUBSYS(objecter, 0, 1)
- SUBSYS(rados, 0, 5)
- SUBSYS(rbd, 0, 5)
- SUBSYS(rbd_mirror, 0, 5)
--SUBSYS(rbd_replay, 0, 5)
- SUBSYS(journaler, 0, 5)
- SUBSYS(objectcacher, 0, 5)
-+SUBSYS(immutable_obj_cache, 0, 5)
-+SUBSYS(rbd_replay, 0, 5)
- SUBSYS(client, 0, 5)
- SUBSYS(osd, 1, 5)
- SUBSYS(optracker, 0, 5)
-diff --git a/src/librbd/CMakeLists.txt b/src/librbd/CMakeLists.txt
-index 540ee78..c9bfb6f 100644
---- a/src/librbd/CMakeLists.txt
-+++ b/src/librbd/CMakeLists.txt
-@@ -32,7 +32,7 @@ set(librbd_internal_srcs
- api/Snapshot.cc
- cache/ImageWriteback.cc
- cache/ObjectCacherObjectDispatch.cc
-- cache/SharedPersistentObjectCacherObjectDispatch.cc
-+ cache/SharedReadOnlyObjectDispatch.cc
- cache/SharedPersistentObjectCacher.cc
- cache/SharedPersistentObjectCacherFile.cc
- deep_copy/ImageCopyRequest.cc
-@@ -125,6 +125,7 @@ set(librbd_internal_srcs
- trash/MoveRequest.cc
- watcher/Notifier.cc
- watcher/RewatchRequest.cc
-+ ${CMAKE_SOURCE_DIR}/src/tools/ceph_immutable_object_cache/CacheClient.cc
- ${CMAKE_SOURCE_DIR}/src/common/ContextCompletion.cc)
-
- add_library(rbd_api STATIC librbd.cc)
-diff --git a/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc b/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc
-deleted file mode 100644
-index 7cbc019..0000000
---- a/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc
-+++ /dev/null
-@@ -1,175 +0,0 @@
--// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
--// vim: ts=8 sw=2 smarttab
--
--#include "librbd/cache/SharedPersistentObjectCacherObjectDispatch.h"
--#include "common/WorkQueue.h"
--#include "librbd/ImageCtx.h"
--#include "librbd/Journal.h"
--#include "librbd/Utils.h"
--#include "librbd/LibrbdWriteback.h"
--#include "librbd/io/ObjectDispatchSpec.h"
--#include "librbd/io/ObjectDispatcher.h"
--#include "librbd/io/Utils.h"
--#include "osd/osd_types.h"
--#include "osdc/WritebackHandler.h"
--#include <vector>
--
--#define dout_subsys ceph_subsys_rbd
--#undef dout_prefix
--#define dout_prefix *_dout << "librbd::cache::SharedPersistentObjectCacherObjectDispatch: " \
-- << this << " " << __func__ << ": "
--
--namespace librbd {
--namespace cache {
--
--template <typename I>
--SharedPersistentObjectCacherObjectDispatch<I>::SharedPersistentObjectCacherObjectDispatch(
-- I* image_ctx) : m_image_ctx(image_ctx) {
--}
--
--template <typename I>
--SharedPersistentObjectCacherObjectDispatch<I>::~SharedPersistentObjectCacherObjectDispatch() {
-- delete m_object_store;
-- delete m_cache_client;
--}
--
--// TODO if connect fails, init will return error to high layer.
--template <typename I>
--void SharedPersistentObjectCacherObjectDispatch<I>::init() {
-- auto cct = m_image_ctx->cct;
-- ldout(cct, 5) << dendl;
--
-- if (m_image_ctx->parent != nullptr) {
-- //TODO(): should we cover multi-leveled clone?
-- ldout(cct, 5) << "child image: skipping SRO cache client" << dendl;
-- return;
-- }
--
-- ldout(cct, 5) << "parent image: setup SRO cache client = " << dendl;
--
-- std::string controller_path = ((CephContext*)cct)->_conf.get_val<std::string>("rbd_shared_cache_sock");
-- m_cache_client = new rbd::cache::CacheClient(controller_path.c_str(),
-- ([&](std::string s){client_handle_request(s);}), m_image_ctx->cct);
--
-- int ret = m_cache_client->connect();
-- if (ret < 0) {
-- ldout(cct, 5) << "SRO cache client fail to connect with local controller: "
-- << "please start rbd-cache daemon"
-- << dendl;
-- } else {
-- ldout(cct, 5) << "SRO cache client to register volume on rbd-cache daemon: "
-- << "name = " << m_image_ctx->id
-- << dendl;
--
-- ret = m_cache_client->register_volume(m_image_ctx->data_ctx.get_pool_name(),
-- m_image_ctx->id, m_image_ctx->size);
--
-- if (ret >= 0) {
-- // add ourself to the IO object dispatcher chain
-- m_image_ctx->io_object_dispatcher->register_object_dispatch(this);
-- }
-- }
--}
--
--template <typename I>
--bool SharedPersistentObjectCacherObjectDispatch<I>::read(
-- const std::string &oid, uint64_t object_no, uint64_t object_off,
-- uint64_t object_len, librados::snap_t snap_id, int op_flags,
-- const ZTracer::Trace &parent_trace, ceph::bufferlist* read_data,
-- io::ExtentMap* extent_map, int* object_dispatch_flags,
-- io::DispatchResult* dispatch_result, Context** on_finish,
-- Context* on_dispatched) {
--
-- // IO chained in reverse order
--
-- // Now, policy is : when session have any error, later all read will dispatched to rados layer.
-- if(!m_cache_client->is_session_work()) {
-- *dispatch_result = io::DISPATCH_RESULT_CONTINUE;
-- on_dispatched->complete(0);
-- return true;
-- // TODO : domain socket have error, all read operation will dispatched to rados layer.
-- }
--
-- auto cct = m_image_ctx->cct;
-- ldout(cct, 20) << "object_no=" << object_no << " " << object_off << "~"
-- << object_len << dendl;
--
--
-- on_dispatched = util::create_async_context_callback(*m_image_ctx,
-- on_dispatched);
-- auto ctx = new FunctionContext([this, oid, object_off, object_len, read_data, dispatch_result, on_dispatched](bool cache) {
-- handle_read_cache(cache, oid, object_off, object_len, read_data, dispatch_result, on_dispatched);
-- });
--
-- if (m_cache_client && m_cache_client->is_session_work() && m_object_store) {
-- m_cache_client->lookup_object(m_image_ctx->data_ctx.get_pool_name(),
-- m_image_ctx->id, oid, ctx);
-- }
-- return true;
--}
--
--template <typename I>
--int SharedPersistentObjectCacherObjectDispatch<I>::handle_read_cache(
-- bool cache,
-- const std::string &oid, uint64_t object_off, uint64_t object_len,
-- ceph::bufferlist* read_data, io::DispatchResult* dispatch_result,
-- Context* on_dispatched) {
-- // IO chained in reverse order
-- auto cct = m_image_ctx->cct;
-- ldout(cct, 20) << dendl;
--
-- // try to read from parent image
-- if (cache) {
-- int r = m_object_store->read_object(oid, read_data, object_off, object_len, on_dispatched);
-- //int r = object_len;
-- if (r != 0) {
-- *dispatch_result = io::DISPATCH_RESULT_COMPLETE;
-- //TODO(): complete in syncfile
-- on_dispatched->complete(r);
-- ldout(cct, 20) << "AAAAcomplete=" << *dispatch_result <<dendl;
-- return true;
-- }
-- } else {
-- *dispatch_result = io::DISPATCH_RESULT_CONTINUE;
-- on_dispatched->complete(0);
-- ldout(cct, 20) << "BBB no cache" << *dispatch_result <<dendl;
-- return false;
-- }
--}
--template <typename I>
--void SharedPersistentObjectCacherObjectDispatch<I>::client_handle_request(std::string msg) {
-- auto cct = m_image_ctx->cct;
-- ldout(cct, 20) << dendl;
--
-- rbd::cache::rbdsc_req_type_t *io_ctx = (rbd::cache::rbdsc_req_type_t*)(msg.c_str());
--
-- switch (io_ctx->type) {
-- case rbd::cache::RBDSC_REGISTER_REPLY: {
-- // open cache handler for volume
-- ldout(cct, 20) << "SRO cache client open cache handler" << dendl;
-- m_object_store = new SharedPersistentObjectCacher<I>(m_image_ctx, m_image_ctx->shared_cache_path);
--
-- break;
-- }
-- case rbd::cache::RBDSC_READ_REPLY: {
-- ldout(cct, 20) << "SRO cache client start to read cache" << dendl;
-- //TODO(): should call read here
--
-- break;
-- }
-- case rbd::cache::RBDSC_READ_RADOS: {
-- ldout(cct, 20) << "SRO cache client start to read rados" << dendl;
-- //TODO(): should call read here
--
-- break;
-- }
-- default: ldout(cct, 20) << "nothing" << io_ctx->type <<dendl;
-- break;
--
-- }
--}
--
--} // namespace cache
--} // namespace librbd
--
--template class librbd::cache::SharedPersistentObjectCacherObjectDispatch<librbd::ImageCtx>;
-diff --git a/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.h b/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.h
-deleted file mode 100644
-index 5685244..0000000
---- a/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.h
-+++ /dev/null
-@@ -1,133 +0,0 @@
--// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
--// vim: ts=8 sw=2 smarttab
--
--#ifndef CEPH_LIBRBD_CACHE_SHARED_PERSISTENT_OBJECT_CACHER_OBJECT_DISPATCH_H
--#define CEPH_LIBRBD_CACHE_SHARED_PERSISTENT_OBJECT_CACHER_OBJECT_DISPATCH_H
--
--#include "librbd/io/ObjectDispatchInterface.h"
--#include "common/Mutex.h"
--#include "osdc/ObjectCacher.h"
--#include "tools/rbd_cache/CacheControllerSocketClient.hpp"
--#include "SharedPersistentObjectCacher.h"
--
--struct WritebackHandler;
--
--namespace librbd {
--
--class ImageCtx;
--
--namespace cache {
--
--/**
-- * Facade around the OSDC object cacher to make it align with
-- * the object dispatcher interface
-- */
--template <typename ImageCtxT = ImageCtx>
--class SharedPersistentObjectCacherObjectDispatch : public io::ObjectDispatchInterface {
--public:
-- static SharedPersistentObjectCacherObjectDispatch* create(ImageCtxT* image_ctx) {
-- return new SharedPersistentObjectCacherObjectDispatch(image_ctx);
-- }
--
-- SharedPersistentObjectCacherObjectDispatch(ImageCtxT* image_ctx);
-- ~SharedPersistentObjectCacherObjectDispatch() override;
--
-- io::ObjectDispatchLayer get_object_dispatch_layer() const override {
-- return io::OBJECT_DISPATCH_LAYER_SHARED_PERSISTENT_CACHE;
-- }
--
-- void init();
-- void shut_down(Context* on_finish) {
-- m_image_ctx->op_work_queue->queue(on_finish, 0);
-- }
--
-- bool read(
-- const std::string &oid, uint64_t object_no, uint64_t object_off,
-- uint64_t object_len, librados::snap_t snap_id, int op_flags,
-- const ZTracer::Trace &parent_trace, ceph::bufferlist* read_data,
-- io::ExtentMap* extent_map, int* object_dispatch_flags,
-- io::DispatchResult* dispatch_result, Context** on_finish,
-- Context* on_dispatched) override;
--
-- bool discard(
-- const std::string &oid, uint64_t object_no, uint64_t object_off,
-- uint64_t object_len, const ::SnapContext &snapc, int discard_flags,
-- const ZTracer::Trace &parent_trace, int* object_dispatch_flags,
-- uint64_t* journal_tid, io::DispatchResult* dispatch_result,
-- Context** on_finish, Context* on_dispatched) {
-- return false;
-- }
--
-- bool write(
-- const std::string &oid, uint64_t object_no, uint64_t object_off,
-- ceph::bufferlist&& data, const ::SnapContext &snapc, int op_flags,
-- const ZTracer::Trace &parent_trace, int* object_dispatch_flags,
-- uint64_t* journal_tid, io::DispatchResult* dispatch_result,
-- Context** on_finish, Context* on_dispatched) {
-- return false;
-- }
--
-- bool write_same(
-- const std::string &oid, uint64_t object_no, uint64_t object_off,
-- uint64_t object_len, io::Extents&& buffer_extents,
-- ceph::bufferlist&& data, const ::SnapContext &snapc, int op_flags,
-- const ZTracer::Trace &parent_trace, int* object_dispatch_flags,
-- uint64_t* journal_tid, io::DispatchResult* dispatch_result,
-- Context** on_finish, Context* on_dispatched) {
-- return false;
-- }
--
-- bool compare_and_write(
-- const std::string &oid, uint64_t object_no, uint64_t object_off,
-- ceph::bufferlist&& cmp_data, ceph::bufferlist&& write_data,
-- const ::SnapContext &snapc, int op_flags,
-- const ZTracer::Trace &parent_trace, uint64_t* mismatch_offset,
-- int* object_dispatch_flags, uint64_t* journal_tid,
-- io::DispatchResult* dispatch_result, Context** on_finish,
-- Context* on_dispatched) {
-- return false;
-- }
--
-- bool flush(
-- io::FlushSource flush_source, const ZTracer::Trace &parent_trace,
-- io::DispatchResult* dispatch_result, Context** on_finish,
-- Context* on_dispatched) {
-- return false;
-- }
--
-- bool invalidate_cache(Context* on_finish) {
-- return false;
-- }
--
-- bool reset_existence_cache(Context* on_finish) {
-- return false;
-- }
--
-- void extent_overwritten(
-- uint64_t object_no, uint64_t object_off, uint64_t object_len,
-- uint64_t journal_tid, uint64_t new_journal_tid) {
-- }
--
-- SharedPersistentObjectCacher<ImageCtxT> *m_object_store = nullptr;
--
--private:
--
-- int handle_read_cache(
-- bool cache,
-- const std::string &oid, uint64_t object_off,
-- uint64_t object_len, ceph::bufferlist* read_data,
-- io::DispatchResult* dispatch_result,
-- Context* on_dispatched);
-- void client_handle_request(std::string msg);
--
-- ImageCtxT* m_image_ctx;
--
-- rbd::cache::CacheClient *m_cache_client = nullptr;
--};
--
--} // namespace cache
--} // namespace librbd
--
--extern template class librbd::cache::SharedPersistentObjectCacherObjectDispatch<librbd::ImageCtx>;
--
--#endif // CEPH_LIBRBD_CACHE_OBJECT_CACHER_OBJECT_DISPATCH_H
-diff --git a/src/librbd/cache/SharedReadOnlyObjectDispatch.cc b/src/librbd/cache/SharedReadOnlyObjectDispatch.cc
-new file mode 100644
-index 0000000..23c7dbe
---- /dev/null
-+++ b/src/librbd/cache/SharedReadOnlyObjectDispatch.cc
-@@ -0,0 +1,170 @@
-+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-+// vim: ts=8 sw=2 smarttab
-+
-+#include "common/WorkQueue.h"
-+#include "librbd/ImageCtx.h"
-+#include "librbd/Journal.h"
-+#include "librbd/Utils.h"
-+#include "librbd/LibrbdWriteback.h"
-+#include "librbd/io/ObjectDispatchSpec.h"
-+#include "librbd/io/ObjectDispatcher.h"
-+#include "librbd/io/Utils.h"
-+#include "librbd/cache/SharedReadOnlyObjectDispatch.h"
-+#include "osd/osd_types.h"
-+#include "osdc/WritebackHandler.h"
-+
-+#include <vector>
-+
-+#define dout_subsys ceph_subsys_rbd
-+#undef dout_prefix
-+#define dout_prefix *_dout << "librbd::cache::SharedReadOnlyObjectDispatch: " \
-+ << this << " " << __func__ << ": "
-+
-+namespace librbd {
-+namespace cache {
-+
-+template <typename I>
-+SharedReadOnlyObjectDispatch<I>::SharedReadOnlyObjectDispatch(
-+ I* image_ctx) : m_image_ctx(image_ctx) {
-+}
-+
-+template <typename I>
-+SharedReadOnlyObjectDispatch<I>::~SharedReadOnlyObjectDispatch() {
-+ delete m_object_store;
-+ delete m_cache_client;
-+}
-+
-+// TODO if connect fails, init will return error to high layer.
-+template <typename I>
-+void SharedReadOnlyObjectDispatch<I>::init() {
-+ auto cct = m_image_ctx->cct;
-+ ldout(cct, 5) << dendl;
-+
-+ if (m_image_ctx->parent != nullptr) {
-+ //TODO(): should we cover multi-leveled clone?
-+ ldout(cct, 5) << "child image: skipping SRO cache client" << dendl;
-+ return;
-+ }
-+
-+ ldout(cct, 5) << "parent image: setup SRO cache client = " << dendl;
-+
-+ std::string controller_path = ((CephContext*)cct)->_conf.get_val<std::string>("rbd_shared_cache_sock");
-+ m_cache_client = new ceph::immutable_obj_cache::CacheClient(controller_path.c_str(),
-+ ([&](std::string s){client_handle_request(s);}), m_image_ctx->cct);
-+
-+ int ret = m_cache_client->connect();
-+ if (ret < 0) {
-+ ldout(cct, 5) << "SRO cache client fail to connect with local controller: "
-+ << "please start rbd-cache daemon"
-+ << dendl;
-+ } else {
-+ ldout(cct, 5) << "SRO cache client to register volume on rbd-cache daemon: "
-+ << "name = " << m_image_ctx->id
-+ << dendl;
-+
-+ ret = m_cache_client->register_volume(m_image_ctx->data_ctx.get_pool_name(),
-+ m_image_ctx->id, m_image_ctx->size);
-+
-+ if (ret >= 0) {
-+ // add ourself to the IO object dispatcher chain
-+ m_image_ctx->io_object_dispatcher->register_object_dispatch(this);
-+ }
-+ }
-+}
-+
-+template <typename I>
-+bool SharedReadOnlyObjectDispatch<I>::read(
-+ const std::string &oid, uint64_t object_no, uint64_t object_off,
-+ uint64_t object_len, librados::snap_t snap_id, int op_flags,
-+ const ZTracer::Trace &parent_trace, ceph::bufferlist* read_data,
-+ io::ExtentMap* extent_map, int* object_dispatch_flags,
-+ io::DispatchResult* dispatch_result, Context** on_finish,
-+ Context* on_dispatched) {
-+ auto cct = m_image_ctx->cct;
-+ ldout(cct, 20) << "object_no=" << object_no << " " << object_off << "~"
-+ << object_len << dendl;
-+
-+ // if any session fails, later reads will go to rados
-+ if(!m_cache_client->is_session_work()) {
-+ *dispatch_result = io::DISPATCH_RESULT_CONTINUE;
-+ on_dispatched->complete(0);
-+ return true;
-+ // TODO(): fix domain socket error
-+ }
-+
-+ auto ctx = new FunctionContext([this, oid, object_off, object_len,
-+ read_data, dispatch_result, on_dispatched](bool cache) {
-+ handle_read_cache(cache, oid, object_off, object_len,
-+ read_data, dispatch_result, on_dispatched);
-+ });
-+
-+ if (m_cache_client && m_cache_client->is_session_work() && m_object_store) {
-+ m_cache_client->lookup_object(m_image_ctx->data_ctx.get_pool_name(),
-+ m_image_ctx->id, oid, ctx);
-+ }
-+ return true;
-+}
-+
-+template <typename I>
-+int SharedReadOnlyObjectDispatch<I>::handle_read_cache(
-+ bool cache, const std::string &oid, uint64_t object_off,
-+ uint64_t object_len, ceph::bufferlist* read_data,
-+ io::DispatchResult* dispatch_result, Context* on_dispatched) {
-+ auto cct = m_image_ctx->cct;
-+ ldout(cct, 20) << dendl;
-+
-+ // try to read from parent image
-+ if (cache) {
-+ int r = m_object_store->read_object(oid, read_data, object_off, object_len, on_dispatched);
-+ //int r = object_len;
-+ if (r != 0) {
-+ *dispatch_result = io::DISPATCH_RESULT_COMPLETE;
-+ //TODO(): complete in syncfile
-+ on_dispatched->complete(r);
-+ ldout(cct, 20) << "read cache: " << *dispatch_result <<dendl;
-+ return true;
-+ }
-+ } else {
-+ *dispatch_result = io::DISPATCH_RESULT_CONTINUE;
-+ on_dispatched->complete(0);
-+ ldout(cct, 20) << "read rados: " << *dispatch_result <<dendl;
-+ return false;
-+ }
-+}
-+template <typename I>
-+void SharedReadOnlyObjectDispatch<I>::client_handle_request(std::string msg) {
-+ auto cct = m_image_ctx->cct;
-+ ldout(cct, 20) << dendl;
-+
-+ ceph::immutable_obj_cache::rbdsc_req_type_t *io_ctx = (ceph::immutable_obj_cache::rbdsc_req_type_t*)(msg.c_str());
-+
-+ switch (io_ctx->type) {
-+ case ceph::immutable_obj_cache::RBDSC_REGISTER_REPLY: {
-+ // open cache handler for volume
-+ ldout(cct, 20) << "SRO cache client open cache handler" << dendl;
-+ m_object_store = new SharedPersistentObjectCacher<I>(m_image_ctx, m_image_ctx->shared_cache_path);
-+
-+ break;
-+ }
-+ case ceph::immutable_obj_cache::RBDSC_READ_REPLY: {
-+ ldout(cct, 20) << "SRO cache client start to read cache" << dendl;
-+ //TODO(): should call read here
-+
-+ break;
-+ }
-+ case ceph::immutable_obj_cache::RBDSC_READ_RADOS: {
-+ ldout(cct, 20) << "SRO cache client start to read rados" << dendl;
-+ //TODO(): should call read here
-+
-+ break;
-+ }
-+ default: ldout(cct, 20) << "nothing" << io_ctx->type <<dendl;
-+ break;
-+
-+ }
-+}
-+
-+} // namespace cache
-+} // namespace librbd
-+
-+template class librbd::cache::SharedReadOnlyObjectDispatch<librbd::ImageCtx>;
-diff --git a/src/librbd/cache/SharedReadOnlyObjectDispatch.h b/src/librbd/cache/SharedReadOnlyObjectDispatch.h
-new file mode 100644
-index 0000000..9b56da9
---- /dev/null
-+++ b/src/librbd/cache/SharedReadOnlyObjectDispatch.h
-@@ -0,0 +1,126 @@
-+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-+// vim: ts=8 sw=2 smarttab
-+
-+#ifndef CEPH_LIBRBD_CACHE_SHARED_PERSISTENT_OBJECT_CACHER_OBJECT_DISPATCH_H
-+#define CEPH_LIBRBD_CACHE_SHARED_PERSISTENT_OBJECT_CACHER_OBJECT_DISPATCH_H
-+
-+#include "common/Mutex.h"
-+#include "SharedPersistentObjectCacher.h"
-+#include "librbd/io/ObjectDispatchInterface.h"
-+#include "tools/ceph_immutable_object_cache/CacheClient.h"
-+
-+
-+namespace librbd {
-+
-+class ImageCtx;
-+
-+namespace cache {
-+
-+template <typename ImageCtxT = ImageCtx>
-+class SharedReadOnlyObjectDispatch : public io::ObjectDispatchInterface {
-+public:
-+ static SharedReadOnlyObjectDispatch* create(ImageCtxT* image_ctx) {
-+ return new SharedReadOnlyObjectDispatch(image_ctx);
-+ }
-+
-+ SharedReadOnlyObjectDispatch(ImageCtxT* image_ctx);
-+ ~SharedReadOnlyObjectDispatch() override;
-+
-+ io::ObjectDispatchLayer get_object_dispatch_layer() const override {
-+ return io::OBJECT_DISPATCH_LAYER_SHARED_PERSISTENT_CACHE;
-+ }
-+
-+ void init();
-+ void shut_down(Context* on_finish) {
-+ m_image_ctx->op_work_queue->queue(on_finish, 0);
-+ }
-+
-+ bool read(
-+ const std::string &oid, uint64_t object_no, uint64_t object_off,
-+ uint64_t object_len, librados::snap_t snap_id, int op_flags,
-+ const ZTracer::Trace &parent_trace, ceph::bufferlist* read_data,
-+ io::ExtentMap* extent_map, int* object_dispatch_flags,
-+ io::DispatchResult* dispatch_result, Context** on_finish,
-+ Context* on_dispatched) override;
-+
-+ bool discard(
-+ const std::string &oid, uint64_t object_no, uint64_t object_off,
-+ uint64_t object_len, const ::SnapContext &snapc, int discard_flags,
-+ const ZTracer::Trace &parent_trace, int* object_dispatch_flags,
-+ uint64_t* journal_tid, io::DispatchResult* dispatch_result,
-+ Context** on_finish, Context* on_dispatched) {
-+ return false;
-+ }
-+
-+ bool write(
-+ const std::string &oid, uint64_t object_no, uint64_t object_off,
-+ ceph::bufferlist&& data, const ::SnapContext &snapc, int op_flags,
-+ const ZTracer::Trace &parent_trace, int* object_dispatch_flags,
-+ uint64_t* journal_tid, io::DispatchResult* dispatch_result,
-+ Context** on_finish, Context* on_dispatched) {
-+ return false;
-+ }
-+
-+ bool write_same(
-+ const std::string &oid, uint64_t object_no, uint64_t object_off,
-+ uint64_t object_len, io::Extents&& buffer_extents,
-+ ceph::bufferlist&& data, const ::SnapContext &snapc, int op_flags,
-+ const ZTracer::Trace &parent_trace, int* object_dispatch_flags,
-+ uint64_t* journal_tid, io::DispatchResult* dispatch_result,
-+ Context** on_finish, Context* on_dispatched) {
-+ return false;
-+ }
-+
-+ bool compare_and_write(
-+ const std::string &oid, uint64_t object_no, uint64_t object_off,
-+ ceph::bufferlist&& cmp_data, ceph::bufferlist&& write_data,
-+ const ::SnapContext &snapc, int op_flags,
-+ const ZTracer::Trace &parent_trace, uint64_t* mismatch_offset,
-+ int* object_dispatch_flags, uint64_t* journal_tid,
-+ io::DispatchResult* dispatch_result, Context** on_finish,
-+ Context* on_dispatched) {
-+ return false;
-+ }
-+
-+ bool flush(
-+ io::FlushSource flush_source, const ZTracer::Trace &parent_trace,
-+ io::DispatchResult* dispatch_result, Context** on_finish,
-+ Context* on_dispatched) {
-+ return false;
-+ }
-+
-+ bool invalidate_cache(Context* on_finish) {
-+ return false;
-+ }
-+
-+ bool reset_existence_cache(Context* on_finish) {
-+ return false;
-+ }
-+
-+ void extent_overwritten(
-+ uint64_t object_no, uint64_t object_off, uint64_t object_len,
-+ uint64_t journal_tid, uint64_t new_journal_tid) {
-+ }
-+
-+private:
-+
-+ int handle_read_cache(
-+ bool cache,
-+ const std::string &oid, uint64_t object_off,
-+ uint64_t object_len, ceph::bufferlist* read_data,
-+ io::DispatchResult* dispatch_result,
-+ Context* on_dispatched);
-+ void client_handle_request(std::string msg);
-+
-+ ImageCtxT* m_image_ctx;
-+
-+ ceph::immutable_obj_cache::CacheClient *m_cache_client = nullptr;
-+ SharedPersistentObjectCacher<ImageCtxT> *m_object_store = nullptr;
-+};
-+
-+} // namespace cache
-+} // namespace librbd
-+
-+extern template class librbd::cache::SharedReadOnlyObjectDispatch<librbd::ImageCtx>;
-+
-+#endif // CEPH_LIBRBD_CACHE_OBJECT_CACHER_OBJECT_DISPATCH_H
-diff --git a/src/librbd/image/OpenRequest.cc b/src/librbd/image/OpenRequest.cc
-index 30a7b66..57ce92f 100644
---- a/src/librbd/image/OpenRequest.cc
-+++ b/src/librbd/image/OpenRequest.cc
-@@ -8,7 +8,7 @@
- #include "librbd/ImageCtx.h"
- #include "librbd/Utils.h"
- #include "librbd/cache/ObjectCacherObjectDispatch.h"
--#include "librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc"
-+#include "librbd/cache/SharedReadOnlyObjectDispatch.cc"
- #include "librbd/image/CloseRequest.h"
- #include "librbd/image/RefreshRequest.h"
- #include "librbd/image/SetSnapRequest.h"
-@@ -457,7 +457,7 @@ Context *OpenRequest<I>::send_init_cache(int *result) {
- // enable Shared Read-only cache for parent image
- if (m_image_ctx->child != nullptr && m_image_ctx->shared_cache_enabled ) {
- ldout(cct, 10) << this << " " << "setting up parent cache"<< dendl;
-- auto sro_cache = cache::SharedPersistentObjectCacherObjectDispatch<I>::create(m_image_ctx);
-+ auto sro_cache = cache::SharedReadOnlyObjectDispatch<I>::create(m_image_ctx);
- sro_cache->init();
- }
-
-diff --git a/src/tools/CMakeLists.txt b/src/tools/CMakeLists.txt
-index 72ab342..f7c5872 100644
---- a/src/tools/CMakeLists.txt
-+++ b/src/tools/CMakeLists.txt
-@@ -99,7 +99,6 @@ endif(WITH_CEPHFS)
- if(WITH_RBD)
- add_subdirectory(rbd)
- add_subdirectory(rbd_mirror)
-- add_subdirectory(rbd_cache)
- if(LINUX)
- add_subdirectory(rbd_nbd)
- endif()
-@@ -108,4 +107,5 @@ if(WITH_RBD)
- endif()
- endif(WITH_RBD)
-
-+add_subdirectory(ceph_immutable_object_cache)
- add_subdirectory(ceph-dencoder)
-diff --git a/src/tools/ceph_immutable_object_cache/CMakeLists.txt b/src/tools/ceph_immutable_object_cache/CMakeLists.txt
-new file mode 100644
-index 0000000..c7c7af3
---- /dev/null
-+++ b/src/tools/ceph_immutable_object_cache/CMakeLists.txt
-@@ -0,0 +1,11 @@
-+add_executable(ceph-immutable-object-cache
-+ ${CMAKE_SOURCE_DIR}/src/librbd/cache/SharedPersistentObjectCacherFile.cc
-+ ObjectCacheStore.cc
-+ CacheController.cc
-+ CacheServer.cc
-+ CacheSession.cc
-+ main.cc)
-+target_link_libraries(ceph-immutable-object-cache
-+ librados
-+ global)
-+install(TARGETS ceph-immutable-object-cache DESTINATION bin)
-diff --git a/src/tools/ceph_immutable_object_cache/CacheClient.cc b/src/tools/ceph_immutable_object_cache/CacheClient.cc
-new file mode 100644
-index 0000000..a7116bf
---- /dev/null
-+++ b/src/tools/ceph_immutable_object_cache/CacheClient.cc
-@@ -0,0 +1,205 @@
-+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-+// vim: ts=8 sw=2 smarttab
-+
-+#include "CacheClient.h"
-+
-+#define dout_context g_ceph_context
-+#define dout_subsys ceph_subsys_immutable_obj_cache
-+#undef dout_prefix
-+#define dout_prefix *_dout << "ceph::cache::CacheControllerSocketClient: " << this << " " \
-+ << __func__ << ": "
-+
-+
-+using boost::asio::local::stream_protocol;
-+
-+namespace ceph {
-+namespace immutable_obj_cache {
-+
-+ CacheClient::CacheClient(const std::string& file, ClientProcessMsg processmsg, CephContext* ceph_ctx)
-+ : m_io_service_work(m_io_service),
-+ m_dm_socket(m_io_service),
-+ m_client_process_msg(processmsg),
-+ m_ep(stream_protocol::endpoint(file)),
-+ m_session_work(false),
-+ cct(ceph_ctx)
-+ {
-+ // TODO wrapper io_service
-+ std::thread thd([this](){m_io_service.run();});
-+ thd.detach();
-+ }
-+
-+ void CacheClient::run(){
-+ }
-+
-+ bool CacheClient::is_session_work() {
-+ return m_session_work.load() == true;
-+ }
-+
-+ // just when error occur, call this method.
-+ void CacheClient::close() {
-+ m_session_work.store(false);
-+ boost::system::error_code close_ec;
-+ m_dm_socket.close(close_ec);
-+ if(close_ec) {
-+ ldout(cct, 20) << "close: " << close_ec.message() << dendl;
-+ }
-+ ldout(cct, 20) << "session don't work, later all request will be dispatched to rados layer" << dendl;
-+ }
-+
-+ int CacheClient::connect() {
-+ boost::system::error_code ec;
-+ m_dm_socket.connect(m_ep, ec);
-+ if(ec) {
-+ if(ec == boost::asio::error::connection_refused) {
-+ ldout(cct, 20) << ec.message() << " : maybe rbd-cache Controller don't startup. "
-+ << "Now data will be read from ceph cluster " << dendl;
-+ } else {
-+ ldout(cct, 20) << "connect: " << ec.message() << dendl;
-+ }
-+
-+ if(m_dm_socket.is_open()) {
-+ // Set to indicate what error occurred, if any.
-+ // Note that, even if the function indicates an error,
-+ // the underlying descriptor is closed.
-+ boost::system::error_code close_ec;
-+ m_dm_socket.close(close_ec);
-+ if(close_ec) {
-+ ldout(cct, 20) << "close: " << close_ec.message() << dendl;
-+ }
-+ }
-+ return -1;
-+ }
-+
-+ ldout(cct, 20) <<"connect success"<< dendl;
-+
-+ return 0;
-+ }
-+
-+ int CacheClient::register_volume(std::string pool_name, std::string vol_name, uint64_t vol_size) {
-+ // cache controller will init layout
-+ rbdsc_req_type_t *message = new rbdsc_req_type_t();
-+ message->type = RBDSC_REGISTER;
-+ memcpy(message->pool_name, pool_name.c_str(), pool_name.size());
-+ memcpy(message->vol_name, vol_name.c_str(), vol_name.size());
-+ message->vol_size = vol_size;
-+ message->offset = 0;
-+ message->length = 0;
-+
-+ uint64_t ret;
-+ boost::system::error_code ec;
-+
-+ ret = boost::asio::write(m_dm_socket, boost::asio::buffer((char*)message, message->size()), ec);
-+ if(ec) {
-+ ldout(cct, 20) << "write fails : " << ec.message() << dendl;
-+ return -1;
-+ }
-+
-+ if(ret != message->size()) {
-+ ldout(cct, 20) << "write fails : ret != send_bytes " << dendl;
-+ return -1;
-+ }
-+
-+ // hard code TODO
-+ ret = boost::asio::read(m_dm_socket, boost::asio::buffer(m_recv_buffer, RBDSC_MSG_LEN), ec);
-+ if(ec == boost::asio::error::eof) {
-+ ldout(cct, 20) << "recv eof" << dendl;
-+ return -1;
-+ }
-+
-+ if(ec) {
-+ ldout(cct, 20) << "write fails : " << ec.message() << dendl;
-+ return -1;
-+ }
-+
-+ if(ret != RBDSC_MSG_LEN) {
-+ ldout(cct, 20) << "write fails : ret != receive bytes " << dendl;
-+ return -1;
-+ }
-+
-+ m_client_process_msg(std::string(m_recv_buffer, ret));
-+
-+ delete message;
-+
-+ ldout(cct, 20) << "register volume success" << dendl;
-+
-+ // TODO
-+ m_session_work.store(true);
-+
-+ return 0;
-+ }
-+
-+ // if occur any error, we just return false. Then read from rados.
-+ int CacheClient::lookup_object(std::string pool_name, std::string vol_name, std::string object_id, Context* on_finish) {
-+ rbdsc_req_type_t *message = new rbdsc_req_type_t();
-+ message->type = RBDSC_READ;
-+ memcpy(message->pool_name, pool_name.c_str(), pool_name.size());
-+ memcpy(message->vol_name, object_id.c_str(), object_id.size());
-+ message->vol_size = 0;
-+ message->offset = 0;
-+ message->length = 0;
-+
-+ boost::asio::async_write(m_dm_socket,
-+ boost::asio::buffer((char*)message, message->size()),
-+ boost::asio::transfer_exactly(RBDSC_MSG_LEN),
-+ [this, on_finish, message](const boost::system::error_code& err, size_t cb) {
-+ delete message;
-+ if(err) {
-+ ldout(cct, 20) << "lookup_object: async_write fails." << err.message() << dendl;
-+ close();
-+ on_finish->complete(false);
-+ return;
-+ }
-+ if(cb != RBDSC_MSG_LEN) {
-+ ldout(cct, 20) << "lookup_object: async_write fails. in-complete request" << dendl;
-+ close();
-+ on_finish->complete(false);
-+ return;
-+ }
-+ get_result(on_finish);
-+ });
-+
-+ return 0;
-+ }
-+
-+ void CacheClient::get_result(Context* on_finish) {
-+ boost::asio::async_read(m_dm_socket, boost::asio::buffer(m_recv_buffer, RBDSC_MSG_LEN),
-+ boost::asio::transfer_exactly(RBDSC_MSG_LEN),
-+ [this, on_finish](const boost::system::error_code& err, size_t cb) {
-+ if(err == boost::asio::error::eof) {
-+ ldout(cct, 20) << "get_result: ack is EOF." << dendl;
-+ close();
-+ on_finish->complete(false);
-+ return;
-+ }
-+ if(err) {
-+ ldout(cct, 20) << "get_result: async_read fails:" << err.message() << dendl;
-+ close();
-+ on_finish->complete(false); // TODO replace this assert with some metohds.
-+ return;
-+ }
-+ if (cb != RBDSC_MSG_LEN) {
-+ close();
-+ ldout(cct, 20) << "get_result: in-complete ack." << dendl;
-+ on_finish->complete(false); // TODO: replace this assert with some methods.
-+ }
-+
-+ rbdsc_req_type_t *io_ctx = (rbdsc_req_type_t*)(m_recv_buffer);
-+
-+ // TODO: re-occur yuan's bug
-+ if(io_ctx->type == RBDSC_READ) {
-+ ldout(cct, 20) << "get rbdsc_read... " << dendl;
-+ assert(0);
-+ }
-+
-+ if (io_ctx->type == RBDSC_READ_REPLY) {
-+ on_finish->complete(true);
-+ return;
-+ } else {
-+ on_finish->complete(false);
-+ return;
-+ }
-+ });
-+ }
-+
-+} // namespace immutable_obj_cache
-+} // namespace ceph
-diff --git a/src/tools/ceph_immutable_object_cache/CacheClient.h b/src/tools/ceph_immutable_object_cache/CacheClient.h
-new file mode 100644
-index 0000000..d82ab8f
---- /dev/null
-+++ b/src/tools/ceph_immutable_object_cache/CacheClient.h
-@@ -0,0 +1,53 @@
-+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-+// vim: ts=8 sw=2 smarttab
-+
-+#ifndef CEPH_CACHE_CLIENT_H
-+#define CEPH_CACHE_CLIENT_H
-+
-+#include <atomic>
-+#include <boost/asio.hpp>
-+#include <boost/bind.hpp>
-+#include <boost/asio/error.hpp>
-+#include <boost/algorithm/string.hpp>
-+#include "librbd/ImageCtx.h"
-+#include "include/assert.h"
-+#include "include/Context.h"
-+#include "SocketCommon.h"
-+
-+
-+using boost::asio::local::stream_protocol;
-+
-+namespace ceph {
-+namespace immutable_obj_cache {
-+
-+class CacheClient {
-+public:
-+ CacheClient(const std::string& file, ClientProcessMsg processmsg, CephContext* ceph_ctx);
-+ void run();
-+ bool is_session_work();
-+
-+ void close();
-+ int connect();
-+
-+ int register_volume(std::string pool_name, std::string vol_name, uint64_t vol_size);
-+ int lookup_object(std::string pool_name, std::string vol_name, std::string object_id, Context* on_finish);
-+ void get_result(Context* on_finish);
-+
-+private:
-+ boost::asio::io_service m_io_service;
-+ boost::asio::io_service::work m_io_service_work;
-+ stream_protocol::socket m_dm_socket;
-+ ClientProcessMsg m_client_process_msg;
-+ stream_protocol::endpoint m_ep;
-+ char m_recv_buffer[1024];
-+
-+ // atomic modfiy for this variable.
-+ // thread 1 : asio callback thread modify it.
-+ // thread 2 : librbd read it.
-+ std::atomic<bool> m_session_work;
-+ CephContext* cct;
-+};
-+
-+} // namespace immutable_obj_cache
-+} // namespace ceph
-+#endif
-diff --git a/src/tools/ceph_immutable_object_cache/CacheController.cc b/src/tools/ceph_immutable_object_cache/CacheController.cc
-new file mode 100644
-index 0000000..cb636d2
---- /dev/null
-+++ b/src/tools/ceph_immutable_object_cache/CacheController.cc
-@@ -0,0 +1,117 @@
-+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-+// vim: ts=8 sw=2 smarttab
-+
-+#include "CacheController.h"
-+
-+#define dout_context g_ceph_context
-+#define dout_subsys ceph_subsys_immutable_obj_cache
-+#undef dout_prefix
-+#define dout_prefix *_dout << "ceph::cache::CacheController: " << this << " " \
-+ << __func__ << ": "
-+
-+namespace ceph {
-+namespace immutable_obj_cache {
-+
-+class ThreadPoolSingleton : public ThreadPool {
-+public:
-+ ContextWQ *op_work_queue;
-+
-+ explicit ThreadPoolSingleton(CephContext *cct)
-+ : ThreadPool(cct, "ceph::cache::thread_pool", "tp_librbd_cache", 32,
-+ "pcache_threads"),
-+ op_work_queue(new ContextWQ("ceph::pcache_op_work_queue",
-+ cct->_conf.get_val<int64_t>("rbd_op_thread_timeout"),
-+ this)) {
-+ start();
-+ }
-+ ~ThreadPoolSingleton() override {
-+ op_work_queue->drain();
-+ delete op_work_queue;
-+
-+ stop();
-+ }
-+};
-+
-+
-+CacheController::CacheController(CephContext *cct, const std::vector<const char*> &args):
-+ m_args(args), m_cct(cct) {
-+
-+}
-+
-+CacheController::~CacheController() {
-+
-+}
-+
-+int CacheController::init() {
-+ ThreadPoolSingleton* thread_pool_singleton = &m_cct->lookup_or_create_singleton_object<ThreadPoolSingleton>(
-+ "ceph::cache::thread_pool", false, m_cct);
-+ pcache_op_work_queue = thread_pool_singleton->op_work_queue;
-+
-+ m_object_cache_store = new ObjectCacheStore(m_cct, pcache_op_work_queue);
-+ //TODO(): make this configurable
-+ int r = m_object_cache_store->init(true);
-+ if (r < 0) {
-+ lderr(m_cct) << "init error\n" << dendl;
-+ }
-+ return r;
-+}
-+
-+int CacheController::shutdown() {
-+ int r = m_object_cache_store->shutdown();
-+ return r;
-+}
-+
-+void CacheController::handle_signal(int signum){}
-+
-+void CacheController::run() {
-+ try {
-+ //TODO(): use new socket path
-+ std::string controller_path = m_cct->_conf.get_val<std::string>("rbd_shared_cache_sock");
-+ std::remove(controller_path.c_str());
-+
-+ m_cache_server = new CacheServer(controller_path,
-+ ([&](uint64_t p, std::string s){handle_request(p, s);}), m_cct);
-+ m_cache_server->run();
-+ } catch (std::exception& e) {
-+ lderr(m_cct) << "Exception: " << e.what() << dendl;
-+ }
-+}
-+
-+void CacheController::handle_request(uint64_t session_id, std::string msg){
-+ rbdsc_req_type_t *io_ctx = (rbdsc_req_type_t*)(msg.c_str());
-+
-+ int ret = 0;
-+
-+ switch (io_ctx->type) {
-+ case RBDSC_REGISTER: {
-+ // init cache layout for volume
-+ m_object_cache_store->init_cache(io_ctx->vol_name, io_ctx->vol_size);
-+ io_ctx->type = RBDSC_REGISTER_REPLY;
-+ m_cache_server->send(session_id, std::string((char*)io_ctx, msg.size()));
-+
-+ break;
-+ }
-+ case RBDSC_READ: {
-+ // lookup object in local cache store
-+ ret = m_object_cache_store->lookup_object(io_ctx->pool_name, io_ctx->vol_name);
-+ if (ret < 0) {
-+ io_ctx->type = RBDSC_READ_RADOS;
-+ } else {
-+ io_ctx->type = RBDSC_READ_REPLY;
-+ }
-+ if (io_ctx->type != RBDSC_READ_REPLY) {
-+ assert(0);
-+ }
-+ m_cache_server->send(session_id, std::string((char*)io_ctx, msg.size()));
-+
-+ break;
-+ }
-+ ldout(m_cct, 5) << "can't recongize request" << dendl;
-+ assert(0); // TODO replace it.
-+ }
-+}
-+
-+} // namespace immutable_obj_cache
-+} // namespace ceph
-+
-+
-diff --git a/src/tools/ceph_immutable_object_cache/CacheController.h b/src/tools/ceph_immutable_object_cache/CacheController.h
-new file mode 100644
-index 0000000..837fe36
---- /dev/null
-+++ b/src/tools/ceph_immutable_object_cache/CacheController.h
-@@ -0,0 +1,53 @@
-+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-+// vim: ts=8 sw=2 smarttab
-+
-+#ifndef CEPH_CACHE_CONTROLLER_H
-+#define CEPH_CACHE_CONTROLLER_H
-+
-+#include "common/Formatter.h"
-+#include "common/admin_socket.h"
-+#include "common/debug.h"
-+#include "common/errno.h"
-+#include "common/ceph_context.h"
-+#include "common/Mutex.h"
-+#include "common/WorkQueue.h"
-+#include "include/rados/librados.hpp"
-+#include "include/rbd/librbd.h"
-+#include "include/assert.h"
-+#include "librbd/ImageCtx.h"
-+#include "librbd/ImageState.h"
-+#include "CacheServer.h"
-+#include "ObjectCacheStore.h"
-+
-+#include <thread>
-+
-+namespace ceph {
-+namespace immutable_obj_cache {
-+
-+class CacheController {
-+ public:
-+ CacheController(CephContext *cct, const std::vector<const char*> &args);
-+ ~CacheController();
-+
-+ int init();
-+
-+ int shutdown();
-+
-+ void handle_signal(int sinnum);
-+
-+ void run();
-+
-+ void handle_request(uint64_t sesstion_id, std::string msg);
-+
-+ private:
-+ CacheServer *m_cache_server;
-+ std::vector<const char*> m_args;
-+ CephContext *m_cct;
-+ ObjectCacheStore *m_object_cache_store;
-+ ContextWQ* pcache_op_work_queue;
-+};
-+
-+} // namespace immutable_obj_cache
-+} // namespace ceph
-+
-+#endif
-diff --git a/src/tools/ceph_immutable_object_cache/CacheServer.cc b/src/tools/ceph_immutable_object_cache/CacheServer.cc
-new file mode 100644
-index 0000000..dd2d47e
---- /dev/null
-+++ b/src/tools/ceph_immutable_object_cache/CacheServer.cc
-@@ -0,0 +1,99 @@
-+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-+// vim: ts=8 sw=2 smarttab
-+
-+#include "common/debug.h"
-+#include "common/ceph_context.h"
-+#include "CacheServer.h"
-+
-+#define dout_context g_ceph_context
-+#define dout_subsys ceph_subsys_immutable_obj_cache
-+#undef dout_prefix
-+#define dout_prefix *_dout << "ceph::cache::CacheControllerSocket: " << this << " " \
-+ << __func__ << ": "
-+
-+
-+using boost::asio::local::stream_protocol;
-+
-+namespace ceph {
-+namespace immutable_obj_cache {
-+
-+CacheServer::CacheServer(const std::string& file, ProcessMsg processmsg, CephContext* cct)
-+ : cct(cct), m_server_process_msg(processmsg),
-+ m_local_path(file), m_acceptor(m_io_service) {}
-+
-+CacheServer::~CacheServer(){}
-+
-+void CacheServer::run() {
-+ bool ret;
-+ ret = start_accept();
-+ if(!ret) {
-+ return;
-+ }
-+ m_io_service.run();
-+}
-+
-+// TODO : use callback to replace this function.
-+void CacheServer::send(uint64_t session_id, std::string msg) {
-+ auto it = m_session_map.find(session_id);
-+ if (it != m_session_map.end()) {
-+ it->second->send(msg);
-+ } else {
-+ // TODO : why don't find existing session id ?
-+ ldout(cct, 20) << "don't find session id..." << dendl;
-+ assert(0);
-+ }
-+}
-+
-+// when creating one acceptor, can control every step in this way.
-+bool CacheServer::start_accept() {
-+ boost::system::error_code ec;
-+ m_acceptor.open(m_local_path.protocol(), ec);
-+ if(ec) {
-+ ldout(cct, 20) << "m_acceptor open fails: " << ec.message() << dendl;
-+ return false;
-+ }
-+
-+ // TODO control acceptor attribute.
-+
-+ m_acceptor.bind(m_local_path, ec);
-+ if(ec) {
-+ ldout(cct, 20) << "m_acceptor bind fails: " << ec.message() << dendl;
-+ return false;
-+ }
-+
-+ m_acceptor.listen(boost::asio::socket_base::max_connections, ec);
-+ if(ec) {
-+ ldout(cct, 20) << "m_acceptor listen fails: " << ec.message() << dendl;
-+ return false;
-+ }
-+
-+ accept();
-+ return true;
-+}
-+
-+void CacheServer::accept() {
-+ CacheSessionPtr new_session(new CacheSession(m_session_id, m_io_service, m_server_process_msg, cct));
-+ m_acceptor.async_accept(new_session->socket(),
-+ boost::bind(&CacheServer::handle_accept, this, new_session,
-+ boost::asio::placeholders::error));
-+}
-+
-+void CacheServer::handle_accept(CacheSessionPtr new_session, const boost::system::error_code& error) {
-+
-+ if(error) {
-+ lderr(cct) << "async accept fails : " << error.message() << dendl;
-+ assert(0); // TODO
-+ }
-+
-+ m_session_map.emplace(m_session_id, new_session);
-+ // TODO : session setting
-+ new_session->start();
-+ m_session_id++;
-+
-+ // lanuch next accept
-+ accept();
-+}
-+
-+} // namespace immutable_obj_cache
-+} // namespace ceph
-+
-diff --git a/src/tools/ceph_immutable_object_cache/CacheServer.h b/src/tools/ceph_immutable_object_cache/CacheServer.h
-new file mode 100644
-index 0000000..6c5c133
---- /dev/null
-+++ b/src/tools/ceph_immutable_object_cache/CacheServer.h
-@@ -0,0 +1,54 @@
-+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-+// vim: ts=8 sw=2 smarttab
-+
-+#ifndef CEPH_CACHE_SERVER_H
-+#define CEPH_CACHE_SERVER_H
-+
-+#include <cstdio>
-+#include <iostream>
-+#include <array>
-+#include <memory>
-+#include <string>
-+#include <boost/bind.hpp>
-+#include <boost/asio.hpp>
-+#include <boost/asio/error.hpp>
-+#include <boost/algorithm/string.hpp>
-+
-+#include "include/assert.h"
-+#include "SocketCommon.h"
-+#include "CacheSession.h"
-+
-+
-+using boost::asio::local::stream_protocol;
-+
-+namespace ceph {
-+namespace immutable_obj_cache {
-+
-+class CacheServer {
-+
-+ public:
-+ CacheServer(const std::string& file, ProcessMsg processmsg, CephContext* cct);
-+ ~CacheServer();
-+
-+ void run();
-+ void send(uint64_t session_id, std::string msg);
-+
-+ private:
-+ bool start_accept();
-+ void accept();
-+ void handle_accept(CacheSessionPtr new_session, const boost::system::error_code& error);
-+
-+ private:
-+ CephContext* cct;
-+ boost::asio::io_service m_io_service; // TODO wrapper it.
-+ ProcessMsg m_server_process_msg;
-+ stream_protocol::endpoint m_local_path;
-+ stream_protocol::acceptor m_acceptor;
-+ uint64_t m_session_id = 1;
-+ std::map<uint64_t, CacheSessionPtr> m_session_map;
-+};
-+
-+} // namespace immutable_obj_cache
-+} // namespace ceph
-+
-+#endif
-diff --git a/src/tools/ceph_immutable_object_cache/CacheSession.cc b/src/tools/ceph_immutable_object_cache/CacheSession.cc
-new file mode 100644
-index 0000000..6cffb41
---- /dev/null
-+++ b/src/tools/ceph_immutable_object_cache/CacheSession.cc
-@@ -0,0 +1,115 @@
-+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-+// vim: ts=8 sw=2 smarttab
-+
-+#include "common/debug.h"
-+#include "common/ceph_context.h"
-+#include "CacheSession.h"
-+
-+#define dout_context g_ceph_context
-+#define dout_subsys ceph_subsys_immutable_obj_cache
-+#undef dout_prefix
-+#define dout_prefix *_dout << "ceph::cache::CacheSession: " << this << " " \
-+ << __func__ << ": "
-+
-+
-+namespace ceph {
-+namespace immutable_obj_cache {
-+
-+CacheSession::CacheSession(uint64_t session_id, boost::asio::io_service& io_service, ProcessMsg processmsg, CephContext* cct)
-+ : m_session_id(session_id), m_dm_socket(io_service), process_msg(processmsg), cct(cct)
-+ {}
-+
-+CacheSession::~CacheSession(){}
-+
-+stream_protocol::socket& CacheSession::socket() {
-+ return m_dm_socket;
-+}
-+
-+void CacheSession::start() {
-+ if(true) {
-+ serial_handing_request();
-+ } else {
-+ parallel_handing_request();
-+ }
-+}
-+// flow:
-+//
-+// recv request --> process request --> reply ack
-+// | |
-+// --------------<-------------------------
-+void CacheSession::serial_handing_request() {
-+ boost::asio::async_read(m_dm_socket, boost::asio::buffer(m_buffer, RBDSC_MSG_LEN),
-+ boost::asio::transfer_exactly(RBDSC_MSG_LEN),
-+ boost::bind(&CacheSession::handle_read,
-+ shared_from_this(),
-+ boost::asio::placeholders::error,
-+ boost::asio::placeholders::bytes_transferred));
-+}
-+
-+// flow :
-+//
-+// --> thread 1: process request
-+// recv request --> thread 2: process request --> reply ack
-+// --> thread n: process request
-+//
-+void CacheSession::parallel_handing_request() {
-+ // TODO
-+}
-+
-+void CacheSession::handle_read(const boost::system::error_code& error, size_t bytes_transferred) {
-+ // when recv eof, the most proble is that client side close socket.
-+ // so, server side need to end handing_request
-+ if(error == boost::asio::error::eof) {
-+ ldout(cct, 20) << "session: async_read : " << error.message() << dendl;
-+ return;
-+ }
-+
-+ if(error) {
-+ ldout(cct, 20) << "session: async_read fails: " << error.message() << dendl;
-+ assert(0);
-+ }
-+
-+ if(bytes_transferred != RBDSC_MSG_LEN) {
-+ ldout(cct, 20) << "session : request in-complete. "<<dendl;
-+ assert(0);
-+ }
-+
-+ // TODO async_process can increse coding readable.
-+ // process_msg_callback call handle async_send
-+ process_msg(m_session_id, std::string(m_buffer, bytes_transferred));
-+}
-+
-+void CacheSession::handle_write(const boost::system::error_code& error, size_t bytes_transferred) {
-+ if (error) {
-+ ldout(cct, 20) << "session: async_write fails: " << error.message() << dendl;
-+ assert(0);
-+ }
-+
-+ if(bytes_transferred != RBDSC_MSG_LEN) {
-+ ldout(cct, 20) << "session : reply in-complete. "<<dendl;
-+ assert(0);
-+ }
-+
-+ boost::asio::async_read(m_dm_socket, boost::asio::buffer(m_buffer),
-+ boost::asio::transfer_exactly(RBDSC_MSG_LEN),
-+ boost::bind(&CacheSession::handle_read,
-+ shared_from_this(),
-+ boost::asio::placeholders::error,
-+ boost::asio::placeholders::bytes_transferred));
-+
-+}
-+
-+void CacheSession::send(std::string msg) {
-+ boost::asio::async_write(m_dm_socket,
-+ boost::asio::buffer(msg.c_str(), msg.size()),
-+ boost::asio::transfer_exactly(RBDSC_MSG_LEN),
-+ boost::bind(&CacheSession::handle_write,
-+ shared_from_this(),
-+ boost::asio::placeholders::error,
-+ boost::asio::placeholders::bytes_transferred));
-+
-+}
-+
-+} // namespace immutable_obj_cache
-+} // namespace ceph
-+
-diff --git a/src/tools/ceph_immutable_object_cache/CacheSession.h b/src/tools/ceph_immutable_object_cache/CacheSession.h
-new file mode 100644
-index 0000000..ce2591b
---- /dev/null
-+++ b/src/tools/ceph_immutable_object_cache/CacheSession.h
-@@ -0,0 +1,58 @@
-+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-+// vim: ts=8 sw=2 smarttab
-+
-+#ifndef CEPH_CACHE_SESSION_H
-+#define CEPH_CACHE_SESSION_H
-+
-+#include <iostream>
-+#include <string>
-+#include <boost/bind.hpp>
-+#include <boost/asio.hpp>
-+#include <boost/asio/error.hpp>
-+#include <boost/algorithm/string.hpp>
-+
-+#include "include/assert.h"
-+#include "SocketCommon.h"
-+
-+
-+using boost::asio::local::stream_protocol;
-+
-+namespace ceph {
-+namespace immutable_obj_cache {
-+
-+class CacheSession : public std::enable_shared_from_this<CacheSession> {
-+public:
-+ CacheSession(uint64_t session_id, boost::asio::io_service& io_service, ProcessMsg processmsg, CephContext* cct);
-+ ~CacheSession();
-+
-+ stream_protocol::socket& socket();
-+ void start();
-+ void serial_handing_request();
-+ void parallel_handing_request();
-+
-+private:
-+
-+ void handle_read(const boost::system::error_code& error, size_t bytes_transferred);
-+
-+ void handle_write(const boost::system::error_code& error, size_t bytes_transferred);
-+
-+public:
-+ void send(std::string msg);
-+
-+private:
-+ uint64_t m_session_id;
-+ stream_protocol::socket m_dm_socket;
-+ ProcessMsg process_msg;
-+ CephContext* cct;
-+
-+ // Buffer used to store data received from the client.
-+ //std::array<char, 1024> data_;
-+ char m_buffer[1024];
-+};
-+
-+typedef std::shared_ptr<CacheSession> CacheSessionPtr;
-+
-+} // namespace immutable_obj_cache
-+} // namespace ceph
-+
-+#endif
-diff --git a/src/tools/ceph_immutable_object_cache/ObjectCacheStore.cc b/src/tools/ceph_immutable_object_cache/ObjectCacheStore.cc
-new file mode 100644
-index 0000000..50721ca
---- /dev/null
-+++ b/src/tools/ceph_immutable_object_cache/ObjectCacheStore.cc
-@@ -0,0 +1,172 @@
-+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-+// vim: ts=8 sw=2 smarttab
-+
-+#include "ObjectCacheStore.h"
-+
-+#define dout_context g_ceph_context
-+#define dout_subsys ceph_subsys_immutable_obj_cache
-+#undef dout_prefix
-+#define dout_prefix *_dout << "ceph::cache::ObjectCacheStore: " << this << " " \
-+ << __func__ << ": "
-+
-+namespace ceph {
-+namespace immutable_obj_cache {
-+
-+ObjectCacheStore::ObjectCacheStore(CephContext *cct, ContextWQ* work_queue)
-+ : m_cct(cct), m_work_queue(work_queue),
-+ m_rados(new librados::Rados()) {
-+
-+ uint64_t object_cache_entries =
-+ cct->_conf.get_val<int64_t>("rbd_shared_cache_entries");
-+
-+ //TODO(): allow to set level
-+ m_policy = new SimplePolicy(object_cache_entries, 0.5);
-+}
-+
-+ObjectCacheStore::~ObjectCacheStore() {
-+ delete m_policy;
-+}
-+
-+int ObjectCacheStore::init(bool reset) {
-+
-+ int ret = m_rados->init_with_context(m_cct);
-+ if(ret < 0) {
-+ lderr(m_cct) << "fail to init Ceph context" << dendl;
-+ return ret;
-+ }
-+
-+ ret = m_rados->connect();
-+ if(ret < 0 ) {
-+ lderr(m_cct) << "fail to conect to cluster" << dendl;
-+ return ret;
-+ }
-+
-+ std::string cache_path = m_cct->_conf.get_val<std::string>("rbd_shared_cache_path");
-+ //TODO(): check and reuse existing cache objects
-+ if(reset) {
-+ std::string cmd = "exec rm -rf " + cache_path + "/rbd_cache*; exec mkdir -p " + cache_path;
-+ //TODO(): to use std::filesystem
-+ int r = system(cmd.c_str());
-+ }
-+
-+ evict_thd = new std::thread([this]{this->evict_thread_body();});
-+ return ret;
-+}
-+
-+int ObjectCacheStore::do_promote(std::string pool_name, std::string object_name) {
-+ int ret = 0;
-+ std::string cache_file_name = pool_name + object_name;
-+
-+ //TODO(): lock on ioctx map
-+ if (m_ioctxs.find(pool_name) == m_ioctxs.end()) {
-+ librados::IoCtx* io_ctx = new librados::IoCtx();
-+ ret = m_rados->ioctx_create(pool_name.c_str(), *io_ctx);
-+ if (ret < 0) {
-+ lderr(m_cct) << "fail to create ioctx" << dendl;
-+ assert(0);
-+ }
-+ m_ioctxs.emplace(pool_name, io_ctx);
-+ }
-+
-+ assert(m_ioctxs.find(pool_name) != m_ioctxs.end());
-+
-+ librados::IoCtx* ioctx = m_ioctxs[pool_name];
-+
-+ librados::bufferlist* read_buf = new librados::bufferlist();
-+ int object_size = 4096*1024; //TODO(): read config from image metadata
-+
-+ //TODO(): async promote
-+ ret = promote_object(ioctx, object_name, read_buf, object_size);
-+ if (ret == -ENOENT) {
-+ read_buf->append(std::string(object_size, '0'));
-+ ret = 0;
-+ }
-+
-+ if( ret < 0) {
-+ lderr(m_cct) << "fail to read from rados" << dendl;
-+ return ret;
-+ }
-+
-+ // persistent to cache
-+ librbd::cache::SyncFile cache_file(m_cct, cache_file_name);
-+ cache_file.open();
-+ ret = cache_file.write_object_to_file(*read_buf, object_size);
-+
-+ // update metadata
-+ assert(OBJ_CACHE_PROMOTING == m_policy->get_status(cache_file_name));
-+ m_policy->update_status(cache_file_name, OBJ_CACHE_PROMOTED);
-+ assert(OBJ_CACHE_PROMOTED == m_policy->get_status(cache_file_name));
-+
-+ return ret;
-+
-+}
-+
-+// return -1, client need to read data from cluster.
-+// return 0, client directly read data from cache.
-+int ObjectCacheStore::lookup_object(std::string pool_name, std::string object_name) {
-+
-+ std::string cache_file_name = pool_name + object_name;
-+
-+ CACHESTATUS ret;
-+ ret = m_policy->lookup_object(cache_file_name);
-+
-+ switch(ret) {
-+ case OBJ_CACHE_NONE:
-+ return do_promote(pool_name, object_name);
-+ case OBJ_CACHE_PROMOTED:
-+ return 0;
-+ case OBJ_CACHE_PROMOTING:
-+ default:
-+ return -1;
-+ }
-+}
-+
-+void ObjectCacheStore::evict_thread_body() {
-+ int ret;
-+ while(m_evict_go) {
-+ ret = evict_objects();
-+ }
-+}
-+
-+
-+int ObjectCacheStore::shutdown() {
-+ m_evict_go = false;
-+ evict_thd->join();
-+ m_rados->shutdown();
-+ return 0;
-+}
-+
-+int ObjectCacheStore::init_cache(std::string vol_name, uint64_t vol_size) {
-+ return 0;
-+}
-+
-+int ObjectCacheStore::lock_cache(std::string vol_name) {
-+ return 0;
-+}
-+
-+int ObjectCacheStore::promote_object(librados::IoCtx* ioctx, std::string object_name, librados::bufferlist* read_buf, uint64_t read_len) {
-+ int ret;
-+
-+ librados::AioCompletion* read_completion = librados::Rados::aio_create_completion();
-+
-+ ret = ioctx->aio_read(object_name, read_completion, read_buf, read_len, 0);
-+ if(ret < 0) {
-+ lderr(m_cct) << "fail to read from rados" << dendl;
-+ return ret;
-+ }
-+ read_completion->wait_for_complete();
-+ ret = read_completion->get_return_value();
-+ return ret;
-+
-+}
-+
-+int ObjectCacheStore::evict_objects() {
-+ std::list<std::string> obj_list;
-+ m_policy->get_evict_list(&obj_list);
-+ for (auto& obj: obj_list) {
-+ //do_evict(obj);
-+ }
-+}
-+
-+} // namespace immutable_obj_cache
-+} // namespace ceph
-diff --git a/src/tools/ceph_immutable_object_cache/ObjectCacheStore.h b/src/tools/ceph_immutable_object_cache/ObjectCacheStore.h
-new file mode 100644
-index 0000000..d044b27
---- /dev/null
-+++ b/src/tools/ceph_immutable_object_cache/ObjectCacheStore.h
-@@ -0,0 +1,70 @@
-+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-+// vim: ts=8 sw=2 smarttab
-+
-+#ifndef CEPH_CACHE_OBJECT_CACHE_STORE_H
-+#define CEPH_CACHE_OBJECT_CACHE_STORE_H
-+
-+#include "common/debug.h"
-+#include "common/errno.h"
-+#include "common/ceph_context.h"
-+#include "common/Mutex.h"
-+#include "include/rados/librados.hpp"
-+#include "include/rbd/librbd.h"
-+#include "librbd/ImageCtx.h"
-+#include "librbd/ImageState.h"
-+#include "librbd/cache/SharedPersistentObjectCacherFile.h"
-+#include "SimplePolicy.hpp"
-+
-+
-+using librados::Rados;
-+using librados::IoCtx;
-+
-+namespace ceph {
-+namespace immutable_obj_cache {
-+
-+typedef shared_ptr<librados::Rados> RadosRef;
-+typedef shared_ptr<librados::IoCtx> IoCtxRef;
-+
-+class ObjectCacheStore
-+{
-+ public:
-+ ObjectCacheStore(CephContext *cct, ContextWQ* work_queue);
-+ ~ObjectCacheStore();
-+
-+ int init(bool reset);
-+
-+ int shutdown();
-+
-+ int lookup_object(std::string pool_name, std::string object_name);
-+
-+ int init_cache(std::string vol_name, uint64_t vol_size);
-+
-+ int lock_cache(std::string vol_name);
-+
-+ private:
-+ void evict_thread_body();
-+ int evict_objects();
-+
-+ int do_promote(std::string pool_name, std::string object_name);
-+
-+ int promote_object(librados::IoCtx*, std::string object_name,
-+ librados::bufferlist* read_buf,
-+ uint64_t length);
-+
-+ CephContext *m_cct;
-+ ContextWQ* m_work_queue;
-+ RadosRef m_rados;
-+
-+
-+ std::map<std::string, librados::IoCtx*> m_ioctxs;
-+
-+ librbd::cache::SyncFile *m_cache_file;
-+
-+ Policy* m_policy;
-+ std::thread* evict_thd;
-+ bool m_evict_go = false;
-+};
-+
-+} // namespace ceph
-+} // namespace immutable_obj_cache
-+#endif
-diff --git a/src/tools/ceph_immutable_object_cache/Policy.hpp b/src/tools/ceph_immutable_object_cache/Policy.hpp
-new file mode 100644
-index 0000000..8090202
---- /dev/null
-+++ b/src/tools/ceph_immutable_object_cache/Policy.hpp
-@@ -0,0 +1,33 @@
-+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-+// vim: ts=8 sw=2 smarttab
-+
-+#ifndef CEPH_CACHE_POLICY_HPP
-+#define CEPH_CACHE_POLICY_HPP
-+
-+#include <list>
-+#include <string>
-+
-+namespace ceph {
-+namespace immutable_obj_cache {
-+
-+enum CACHESTATUS {
-+ OBJ_CACHE_NONE = 0,
-+ OBJ_CACHE_PROMOTING,
-+ OBJ_CACHE_PROMOTED,
-+};
-+
-+
-+class Policy {
-+public:
-+ Policy(){}
-+ virtual ~Policy(){};
-+ virtual CACHESTATUS lookup_object(std::string) = 0;
-+ virtual int evict_object(std::string&) = 0;
-+ virtual void update_status(std::string, CACHESTATUS) = 0;
-+ virtual CACHESTATUS get_status(std::string) = 0;
-+ virtual void get_evict_list(std::list<std::string>* obj_list) = 0;
-+};
-+
-+} // namespace immutable_obj_cache
-+} // namespace ceph
-+#endif
-diff --git a/src/tools/ceph_immutable_object_cache/SimplePolicy.hpp b/src/tools/ceph_immutable_object_cache/SimplePolicy.hpp
-new file mode 100644
-index 0000000..757ee6a
---- /dev/null
-+++ b/src/tools/ceph_immutable_object_cache/SimplePolicy.hpp
-@@ -0,0 +1,163 @@
-+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-+// vim: ts=8 sw=2 smarttab
-+
-+#ifndef CEPH_CACHE_SIMPLE_POLICY_HPP
-+#define CEPH_CACHE_SIMPLE_POLICY_HPP
-+
-+#include "Policy.hpp"
-+#include "include/lru.h"
-+#include "common/RWLock.h"
-+#include "common/Mutex.h"
-+
-+#include <vector>
-+#include <unordered_map>
-+#include <string>
-+
-+namespace ceph {
-+namespace immutable_obj_cache {
-+
-+
-+class SimplePolicy : public Policy {
-+public:
-+ SimplePolicy(uint64_t block_num, float watermark)
-+ : m_watermark(watermark), m_entry_count(block_num),
-+ m_cache_map_lock("rbd::cache::SimplePolicy::m_cache_map_lock"),
-+ m_free_list_lock("rbd::cache::SimplePolicy::m_free_list_lock")
-+ {
-+
-+ for(uint64_t i = 0; i < m_entry_count; i++) {
-+ m_free_list.push_back(new Entry());
-+ }
-+
-+ }
-+
-+ ~SimplePolicy() {
-+ for(uint64_t i = 0; i < m_entry_count; i++) {
-+ Entry* entry = reinterpret_cast<Entry*>(m_free_list.front());
-+ delete entry;
-+ m_free_list.pop_front();
-+ }
-+ }
-+
-+ CACHESTATUS lookup_object(std::string cache_file_name) {
-+
-+ //TODO(): check race condition
-+ RWLock::WLocker wlocker(m_cache_map_lock);
-+
-+ auto entry_it = m_cache_map.find(cache_file_name);
-+ if(entry_it == m_cache_map.end()) {
-+ Mutex::Locker locker(m_free_list_lock);
-+ Entry* entry = reinterpret_cast<Entry*>(m_free_list.front());
-+ assert(entry != nullptr);
-+ m_free_list.pop_front();
-+ entry->status = OBJ_CACHE_PROMOTING;
-+
-+ m_cache_map[cache_file_name] = entry;
-+
-+ return OBJ_CACHE_NONE;
-+ }
-+
-+ Entry* entry = entry_it->second;
-+
-+ if(entry->status == OBJ_CACHE_PROMOTED) {
-+ // touch it
-+ m_promoted_lru.lru_touch(entry);
-+ }
-+
-+ return entry->status;
-+ }
-+
-+ int evict_object(std::string& out_cache_file_name) {
-+ RWLock::WLocker locker(m_cache_map_lock);
-+
-+ return 1;
-+ }
-+
-+ // TODO(): simplify the logic
-+ void update_status(std::string file_name, CACHESTATUS new_status) {
-+ RWLock::WLocker locker(m_cache_map_lock);
-+
-+ Entry* entry;
-+ auto entry_it = m_cache_map.find(file_name);
-+
-+ // just check.
-+ if(new_status == OBJ_CACHE_PROMOTING) {
-+ assert(entry_it == m_cache_map.end());
-+ }
-+
-+ assert(entry_it != m_cache_map.end());
-+
-+ entry = entry_it->second;
-+
-+ // promoting is done, so update it.
-+ if(entry->status == OBJ_CACHE_PROMOTING && new_status== OBJ_CACHE_PROMOTED) {
-+ m_promoted_lru.lru_insert_top(entry);
-+ entry->status = new_status;
-+ return;
-+ }
-+
-+ assert(0);
-+ }
-+
-+ // get entry status
-+ CACHESTATUS get_status(std::string file_name) {
-+ RWLock::RLocker locker(m_cache_map_lock);
-+ auto entry_it = m_cache_map.find(file_name);
-+ if(entry_it == m_cache_map.end()) {
-+ return OBJ_CACHE_NONE;
-+ }
-+
-+ return entry_it->second->status;
-+ }
-+
-+ void get_evict_list(std::list<std::string>* obj_list) {
-+ RWLock::WLocker locker(m_cache_map_lock);
-+ // check free ratio, pop entries from LRU
-+ if (m_free_list.size() / m_entry_count < m_watermark) {
-+ int evict_num = 10; //TODO(): make this configurable
-+ for(int i = 0; i < evict_num; i++) {
-+ Entry* entry = reinterpret_cast<Entry*>(m_promoted_lru.lru_expire());
-+ if (entry == nullptr) {
-+ continue;
-+ }
-+ std::string file_name = entry->cache_file_name;
-+ obj_list->push_back(file_name);
-+
-+ auto entry_it = m_cache_map.find(file_name);
-+ m_cache_map.erase(entry_it);
-+
-+ //mark this entry as free
-+ entry->status = OBJ_CACHE_NONE;
-+ Mutex::Locker locker(m_free_list_lock);
-+ m_free_list.push_back(entry);
-+ }
-+ }
-+ }
-+
-+private:
-+
-+ class Entry : public LRUObject {
-+ public:
-+ CACHESTATUS status;
-+ Entry() : status(OBJ_CACHE_NONE){}
-+ std::string cache_file_name;
-+ void encode(bufferlist &bl){}
-+ void decode(bufferlist::iterator &it){}
-+ };
-+
-+ float m_watermark;
-+ uint64_t m_entry_count;
-+
-+ std::unordered_map<std::string, Entry*> m_cache_map;
-+ RWLock m_cache_map_lock;
-+
-+ std::deque<Entry*> m_free_list;
-+ Mutex m_free_list_lock;
-+
-+ LRU m_promoted_lru; // include promoted, using status.
-+
-+};
-+
-+} // namespace immutable_obj_cache
-+} // namespace ceph
-+#endif
-diff --git a/src/tools/ceph_immutable_object_cache/SocketCommon.h b/src/tools/ceph_immutable_object_cache/SocketCommon.h
-new file mode 100644
-index 0000000..53dca54
---- /dev/null
-+++ b/src/tools/ceph_immutable_object_cache/SocketCommon.h
-@@ -0,0 +1,54 @@
-+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-+// vim: ts=8 sw=2 smarttab
-+
-+#ifndef CEPH_CACHE_SOCKET_COMMON_H
-+#define CEPH_CACHE_SOCKET_COMMON_H
-+
-+namespace ceph {
-+namespace immutable_obj_cache {
-+
-+static const int RBDSC_REGISTER = 0X11;
-+static const int RBDSC_READ = 0X12;
-+static const int RBDSC_LOOKUP = 0X13;
-+static const int RBDSC_REGISTER_REPLY = 0X14;
-+static const int RBDSC_READ_REPLY = 0X15;
-+static const int RBDSC_LOOKUP_REPLY = 0X16;
-+static const int RBDSC_READ_RADOS = 0X17;
-+
-+
-+
-+typedef std::function<void(uint64_t, std::string)> ProcessMsg;
-+typedef std::function<void(std::string)> ClientProcessMsg;
-+typedef uint8_t rbdsc_req_type;
-+
-+//TODO(): switch to bufferlist
-+struct rbdsc_req_type_t {
-+ rbdsc_req_type type;
-+ uint64_t vol_size;
-+ uint64_t offset;
-+ uint64_t length;
-+ char pool_name[256];
-+ char vol_name[256];
-+
-+ uint64_t size() {
-+ return sizeof(rbdsc_req_type_t);
-+ }
-+
-+ std::string to_buffer() {
-+ std::stringstream ss;
-+ ss << type;
-+ ss << vol_size;
-+ ss << offset;
-+ ss << length;
-+ ss << pool_name;
-+ ss << vol_name;
-+
-+ return ss.str();
-+ }
-+};
-+
-+static const int RBDSC_MSG_LEN = sizeof(rbdsc_req_type_t);
-+
-+} // namespace immutable_obj_cache
-+} // namespace ceph
-+#endif
-diff --git a/src/tools/ceph_immutable_object_cache/main.cc b/src/tools/ceph_immutable_object_cache/main.cc
-new file mode 100644
-index 0000000..7a9131d
---- /dev/null
-+++ b/src/tools/ceph_immutable_object_cache/main.cc
-@@ -0,0 +1,85 @@
-+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-+// vim: ts=8 sw=2 smarttab
-+
-+#include "common/ceph_argparse.h"
-+#include "common/config.h"
-+#include "common/debug.h"
-+#include "common/errno.h"
-+#include "global/global_init.h"
-+#include "global/signal_handler.h"
-+#include "CacheController.h"
-+
-+#include <vector>
-+
-+ceph::immutable_obj_cache::CacheController *cachectl = nullptr;
-+
-+void usage() {
-+ std::cout << "usage: cache controller [options...]" << std::endl;
-+ std::cout << "options:\n";
-+ std::cout << " -m monaddress[:port] connect to specified monitor\n";
-+ std::cout << " --keyring=<path> path to keyring for local cluster\n";
-+ std::cout << " --log-file=<logfile> file to log debug output\n";
-+ std::cout << " --debug-rbd-cachecontroller=<log-level>/<memory-level> set rbd-mirror debug level\n";
-+ generic_server_usage();
-+}
-+
-+static void handle_signal(int signum)
-+{
-+ if (cachectl)
-+ cachectl->handle_signal(signum);
-+}
-+
-+int main(int argc, const char **argv)
-+{
-+ std::vector<const char*> args;
-+ env_to_vec(args);
-+ argv_to_vec(argc, argv, args);
-+
-+ auto cct = global_init(nullptr, args, CEPH_ENTITY_TYPE_CLIENT,
-+ CODE_ENVIRONMENT_DAEMON,
-+ CINIT_FLAG_UNPRIVILEGED_DAEMON_DEFAULTS);
-+
-+ for (auto i = args.begin(); i != args.end(); ++i) {
-+ if (ceph_argparse_flag(args, i, "-h", "--help", (char*)NULL)) {
-+ usage();
-+ return EXIT_SUCCESS;
-+ }
-+ }
-+
-+ if (g_conf()->daemonize) {
-+ global_init_daemonize(g_ceph_context);
-+ }
-+ g_ceph_context->enable_perf_counter();
-+
-+ common_init_finish(g_ceph_context);
-+
-+ init_async_signal_handler();
-+ register_async_signal_handler(SIGHUP, sighup_handler);
-+ register_async_signal_handler_oneshot(SIGINT, handle_signal);
-+ register_async_signal_handler_oneshot(SIGTERM, handle_signal);
-+
-+ std::vector<const char*> cmd_args;
-+ argv_to_vec(argc, argv, cmd_args);
-+
-+ // disable unnecessary librbd cache
-+ g_ceph_context->_conf.set_val_or_die("rbd_cache", "false");
-+
-+ cachectl = new ceph::immutable_obj_cache::CacheController(g_ceph_context, cmd_args);
-+ int r = cachectl->init();
-+ if (r < 0) {
-+ std::cerr << "failed to initialize: " << cpp_strerror(r) << std::endl;
-+ goto cleanup;
-+ }
-+
-+ cachectl->run();
-+
-+ cleanup:
-+ unregister_async_signal_handler(SIGHUP, sighup_handler);
-+ unregister_async_signal_handler(SIGINT, handle_signal);
-+ unregister_async_signal_handler(SIGTERM, handle_signal);
-+ shutdown_async_signal_handler();
-+
-+ delete cachectl;
-+
-+ return r < 0 ? EXIT_SUCCESS : EXIT_FAILURE;
-+}
-diff --git a/src/tools/rbd_cache/CMakeLists.txt b/src/tools/rbd_cache/CMakeLists.txt
-deleted file mode 100644
-index 597d802..0000000
---- a/src/tools/rbd_cache/CMakeLists.txt
-+++ /dev/null
-@@ -1,9 +0,0 @@
--add_executable(rbd-cache
-- ${CMAKE_SOURCE_DIR}/src/librbd/cache/SharedPersistentObjectCacherFile.cc
-- ObjectCacheStore.cc
-- CacheController.cc
-- main.cc)
--target_link_libraries(rbd-cache
-- librados
-- global)
--install(TARGETS rbd-cache DESTINATION bin)
-diff --git a/src/tools/rbd_cache/CacheController.cc b/src/tools/rbd_cache/CacheController.cc
-deleted file mode 100644
-index 620192c..0000000
---- a/src/tools/rbd_cache/CacheController.cc
-+++ /dev/null
-@@ -1,116 +0,0 @@
--// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
--// vim: ts=8 sw=2 smarttab
--
--#include "CacheController.h"
--
--#define dout_context g_ceph_context
--#define dout_subsys ceph_subsys_rbd_cache
--#undef dout_prefix
--#define dout_prefix *_dout << "rbd::cache::CacheController: " << this << " " \
-- << __func__ << ": "
--
--namespace rbd {
--namespace cache {
--
--class ThreadPoolSingleton : public ThreadPool {
--public:
-- ContextWQ *op_work_queue;
--
-- explicit ThreadPoolSingleton(CephContext *cct)
-- : ThreadPool(cct, "librbd::cache::thread_pool", "tp_librbd_cache", 32,
-- "pcache_threads"),
-- op_work_queue(new ContextWQ("librbd::pcache_op_work_queue",
-- cct->_conf.get_val<int64_t>("rbd_op_thread_timeout"),
-- this)) {
-- start();
-- }
-- ~ThreadPoolSingleton() override {
-- op_work_queue->drain();
-- delete op_work_queue;
--
-- stop();
-- }
--};
--
--
--CacheController::CacheController(CephContext *cct, const std::vector<const char*> &args):
-- m_args(args), m_cct(cct) {
--
--}
--
--CacheController::~CacheController() {
--
--}
--
--int CacheController::init() {
-- ThreadPoolSingleton* thread_pool_singleton = &m_cct->lookup_or_create_singleton_object<ThreadPoolSingleton>(
-- "rbd::cache::thread_pool", false, m_cct);
-- pcache_op_work_queue = thread_pool_singleton->op_work_queue;
--
-- m_object_cache_store = new ObjectCacheStore(m_cct, pcache_op_work_queue);
-- int r = m_object_cache_store->init(false);
-- if (r < 0) {
-- //derr << "init error\n" << dendl;
-- }
-- return r;
--}
--
--int CacheController::shutdown() {
-- int r = m_object_cache_store->shutdown();
-- return r;
--}
--
--void CacheController::handle_signal(int signum){}
--
--void CacheController::run() {
-- try {
-- //TODO(): use new socket path
-- std::string controller_path = m_cct->_conf.get_val<std::string>("rbd_shared_cache_sock");
-- std::remove(controller_path.c_str());
--
-- m_cache_server = new CacheServer(controller_path,
-- ([&](uint64_t p, std::string s){handle_request(p, s);}), m_cct);
-- m_cache_server->run();
-- } catch (std::exception& e) {
-- std::cerr << "Exception: " << e.what() << "\n";
-- }
--}
--
--void CacheController::handle_request(uint64_t session_id, std::string msg){
-- rbdsc_req_type_t *io_ctx = (rbdsc_req_type_t*)(msg.c_str());
--
-- int ret = 0;
--
-- switch (io_ctx->type) {
-- case RBDSC_REGISTER: {
-- // init cache layout for volume
-- m_object_cache_store->init_cache(io_ctx->vol_name, io_ctx->vol_size);
-- io_ctx->type = RBDSC_REGISTER_REPLY;
-- m_cache_server->send(session_id, std::string((char*)io_ctx, msg.size()));
--
-- break;
-- }
-- case RBDSC_READ: {
-- // lookup object in local cache store
-- ret = m_object_cache_store->lookup_object(io_ctx->pool_name, io_ctx->vol_name);
-- if (ret < 0) {
-- io_ctx->type = RBDSC_READ_RADOS;
-- } else {
-- io_ctx->type = RBDSC_READ_REPLY;
-- }
-- if (io_ctx->type != RBDSC_READ_REPLY) {
-- assert(0);
-- }
-- m_cache_server->send(session_id, std::string((char*)io_ctx, msg.size()));
--
-- break;
-- }
-- std::cout<<"can't recongize request"<<std::endl;
-- assert(0); // TODO replace it.
-- }
--}
--
--} // namespace rbd
--} // namespace cache
--
--
-diff --git a/src/tools/rbd_cache/CacheController.h b/src/tools/rbd_cache/CacheController.h
-deleted file mode 100644
-index 0e23484..0000000
---- a/src/tools/rbd_cache/CacheController.h
-+++ /dev/null
-@@ -1,54 +0,0 @@
--#ifndef CACHE_CONTROLLER_H
--#define CACHE_CONTROLLER_H
--
--#include <thread>
--
--#include "common/Formatter.h"
--#include "common/admin_socket.h"
--#include "common/debug.h"
--#include "common/errno.h"
--#include "common/ceph_context.h"
--#include "common/Mutex.h"
--#include "common/WorkQueue.h"
--#include "include/rados/librados.hpp"
--#include "include/rbd/librbd.h"
--#include "include/assert.h"
--#include "librbd/ImageCtx.h"
--#include "librbd/ImageState.h"
--
--#include "CacheControllerSocket.hpp"
--#include "ObjectCacheStore.h"
--
--
--using boost::asio::local::stream_protocol;
--
--namespace rbd {
--namespace cache {
--
--class CacheController {
-- public:
-- CacheController(CephContext *cct, const std::vector<const char*> &args);
-- ~CacheController();
--
-- int init();
--
-- int shutdown();
--
-- void handle_signal(int sinnum);
--
-- void run();
--
-- void handle_request(uint64_t sesstion_id, std::string msg);
--
-- private:
-- CacheServer *m_cache_server;
-- std::vector<const char*> m_args;
-- CephContext *m_cct;
-- ObjectCacheStore *m_object_cache_store;
-- ContextWQ* pcache_op_work_queue;
--};
--
--} // namespace rbd
--} // namespace cache
--
--#endif
-diff --git a/src/tools/rbd_cache/CacheControllerSocket.hpp b/src/tools/rbd_cache/CacheControllerSocket.hpp
-deleted file mode 100644
-index 2ff7477..0000000
---- a/src/tools/rbd_cache/CacheControllerSocket.hpp
-+++ /dev/null
-@@ -1,228 +0,0 @@
--// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
--// vim: ts=8 sw=2 smarttab
--
--#ifndef CACHE_CONTROLLER_SOCKET_H
--#define CACHE_CONTROLLER_SOCKET_H
--
--#include <cstdio>
--#include <iostream>
--#include <array>
--#include <memory>
--#include <string>
--#include <boost/bind.hpp>
--#include <boost/asio.hpp>
--#include <boost/asio/error.hpp>
--#include <boost/algorithm/string.hpp>
--#include "CacheControllerSocketCommon.h"
--
--
--using boost::asio::local::stream_protocol;
--
--namespace rbd {
--namespace cache {
--
--class session : public std::enable_shared_from_this<session> {
--public:
-- session(uint64_t session_id, boost::asio::io_service& io_service, ProcessMsg processmsg)
-- : m_session_id(session_id), m_dm_socket(io_service), process_msg(processmsg) {}
--
-- stream_protocol::socket& socket() {
-- return m_dm_socket;
-- }
--
-- void start() {
-- if(true) {
-- serial_handing_request();
-- } else {
-- parallel_handing_request();
-- }
-- }
-- // flow:
-- //
-- // recv request --> process request --> reply ack
-- // | |
-- // --------------<-------------------------
-- void serial_handing_request() {
-- boost::asio::async_read(m_dm_socket, boost::asio::buffer(m_buffer, RBDSC_MSG_LEN),
-- boost::asio::transfer_exactly(RBDSC_MSG_LEN),
-- boost::bind(&session::handle_read,
-- shared_from_this(),
-- boost::asio::placeholders::error,
-- boost::asio::placeholders::bytes_transferred));
-- }
--
-- // flow :
-- //
-- // --> thread 1: process request
-- // recv request --> thread 2: process request --> reply ack
-- // --> thread n: process request
-- //
-- void parallel_handing_request() {
-- // TODO
-- }
--
--private:
--
-- void handle_read(const boost::system::error_code& error, size_t bytes_transferred) {
-- // when recv eof, the most proble is that client side close socket.
-- // so, server side need to end handing_request
-- if(error == boost::asio::error::eof) {
-- std::cout<<"session: async_read : " << error.message() << std::endl;
-- return;
-- }
--
-- if(error) {
-- std::cout<<"session: async_read fails: " << error.message() << std::endl;
-- assert(0);
-- }
--
-- if(bytes_transferred != RBDSC_MSG_LEN) {
-- std::cout<<"session : request in-complete. "<<std::endl;
-- assert(0);
-- }
--
-- // TODO async_process can increse coding readable.
-- // process_msg_callback call handle async_send
-- process_msg(m_session_id, std::string(m_buffer, bytes_transferred));
-- }
--
-- void handle_write(const boost::system::error_code& error, size_t bytes_transferred) {
-- if (error) {
-- std::cout<<"session: async_write fails: " << error.message() << std::endl;
-- assert(0);
-- }
--
-- if(bytes_transferred != RBDSC_MSG_LEN) {
-- std::cout<<"session : reply in-complete. "<<std::endl;
-- assert(0);
-- }
--
-- boost::asio::async_read(m_dm_socket, boost::asio::buffer(m_buffer),
-- boost::asio::transfer_exactly(RBDSC_MSG_LEN),
-- boost::bind(&session::handle_read,
-- shared_from_this(),
-- boost::asio::placeholders::error,
-- boost::asio::placeholders::bytes_transferred));
--
-- }
--
--public:
-- void send(std::string msg) {
-- boost::asio::async_write(m_dm_socket,
-- boost::asio::buffer(msg.c_str(), msg.size()),
-- boost::asio::transfer_exactly(RBDSC_MSG_LEN),
-- boost::bind(&session::handle_write,
-- shared_from_this(),
-- boost::asio::placeholders::error,
-- boost::asio::placeholders::bytes_transferred));
--
-- }
--
--private:
-- uint64_t m_session_id;
-- stream_protocol::socket m_dm_socket;
-- ProcessMsg process_msg;
--
-- // Buffer used to store data received from the client.
-- //std::array<char, 1024> data_;
-- char m_buffer[1024];
--};
--
--typedef std::shared_ptr<session> session_ptr;
--
--class CacheServer {
--public:
-- CacheServer(const std::string& file, ProcessMsg processmsg, CephContext* cct)
-- : m_cct(cct), m_server_process_msg(processmsg),
-- m_local_path(file),
-- m_acceptor(m_io_service)
-- {}
--
-- void run() {
-- bool ret;
-- ret = start_accept();
-- if(!ret) {
-- return;
-- }
-- m_io_service.run();
-- }
--
-- // TODO : use callback to replace this function.
-- void send(uint64_t session_id, std::string msg) {
-- auto it = m_session_map.find(session_id);
-- if (it != m_session_map.end()) {
-- it->second->send(msg);
-- } else {
-- // TODO : why don't find existing session id ?
-- std::cout<<"don't find session id..."<<std::endl;
-- assert(0);
-- }
-- }
--
--private:
-- // when creating one acceptor, can control every step in this way.
-- bool start_accept() {
-- boost::system::error_code ec;
-- m_acceptor.open(m_local_path.protocol(), ec);
-- if(ec) {
-- std::cout << "m_acceptor open fails: " << ec.message() << std::endl;
-- return false;
-- }
--
-- // TODO control acceptor attribute.
--
-- m_acceptor.bind(m_local_path, ec);
-- if(ec) {
-- std::cout << "m_acceptor bind fails: " << ec.message() << std::endl;
-- return false;
-- }
--
-- m_acceptor.listen(boost::asio::socket_base::max_connections, ec);
-- if(ec) {
-- std::cout << "m_acceptor listen fails: " << ec.message() << std::endl;
-- return false;
-- }
--
-- accept();
-- return true;
-- }
--
-- void accept() {
-- session_ptr new_session(new session(m_session_id, m_io_service, m_server_process_msg));
-- m_acceptor.async_accept(new_session->socket(),
-- boost::bind(&CacheServer::handle_accept, this, new_session,
-- boost::asio::placeholders::error));
-- }
--
-- void handle_accept(session_ptr new_session, const boost::system::error_code& error) {
-- //TODO(): open librbd snap ... yuan
--
-- if(error) {
-- std::cout << "async accept fails : " << error.message() << std::endl;
-- assert(0); // TODO
-- }
--
-- // must put session into m_session_map at the front of session.start()
-- m_session_map.emplace(m_session_id, new_session);
-- // TODO : session setting
-- new_session->start();
-- m_session_id++;
--
-- // lanuch next accept
-- accept();
-- }
--
--private:
-- CephContext* m_cct;
-- boost::asio::io_service m_io_service; // TODO wrapper it.
-- ProcessMsg m_server_process_msg;
-- stream_protocol::endpoint m_local_path;
-- stream_protocol::acceptor m_acceptor;
-- uint64_t m_session_id = 1;
-- std::map<uint64_t, session_ptr> m_session_map;
--};
--
--} // namespace cache
--} // namespace rbd
--
--#endif
-diff --git a/src/tools/rbd_cache/CacheControllerSocketClient.hpp b/src/tools/rbd_cache/CacheControllerSocketClient.hpp
-deleted file mode 100644
-index 964f888..0000000
---- a/src/tools/rbd_cache/CacheControllerSocketClient.hpp
-+++ /dev/null
-@@ -1,229 +0,0 @@
--// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
--// vim: ts=8 sw=2 smarttab
--
--#ifndef CACHE_CONTROLLER_SOCKET_CLIENT_H
--#define CACHE_CONTROLLER_SOCKET_CLIENT_H
--
--#include <atomic>
--#include <boost/asio.hpp>
--#include <boost/bind.hpp>
--#include <boost/asio/error.hpp>
--#include <boost/algorithm/string.hpp>
--#include "librbd/ImageCtx.h"
--#include "include/assert.h"
--#include "include/Context.h"
--#include "CacheControllerSocketCommon.h"
--
--
--using boost::asio::local::stream_protocol;
--
--namespace rbd {
--namespace cache {
--
--class CacheClient {
--public:
-- CacheClient(const std::string& file, ClientProcessMsg processmsg, CephContext* ceph_ctx)
-- : m_io_service_work(m_io_service),
-- m_dm_socket(m_io_service),
-- m_client_process_msg(processmsg),
-- m_ep(stream_protocol::endpoint(file)),
-- m_session_work(false),
-- cct(ceph_ctx)
-- {
-- // TODO wrapper io_service
-- std::thread thd([this](){
-- m_io_service.run();});
-- thd.detach();
-- }
--
-- void run(){
-- }
--
-- bool is_session_work() {
-- return m_session_work.load() == true;
-- }
--
-- // just when error occur, call this method.
-- void close() {
-- m_session_work.store(false);
-- boost::system::error_code close_ec;
-- m_dm_socket.close(close_ec);
-- if(close_ec) {
-- std::cout << "close: " << close_ec.message() << std::endl;
-- }
-- std::cout << "session don't work, later all request will be dispatched to rados layer" << std::endl;
-- }
--
-- int connect() {
-- boost::system::error_code ec;
-- m_dm_socket.connect(m_ep, ec);
-- if(ec) {
-- if(ec == boost::asio::error::connection_refused) {
-- std::cout << ec.message() << " : maybe rbd-cache Controller don't startup. "
-- << "Now data will be read from ceph cluster " << std::endl;
-- } else {
-- std::cout << "connect: " << ec.message() << std::endl;
-- }
--
-- if(m_dm_socket.is_open()) {
-- // Set to indicate what error occurred, if any.
-- // Note that, even if the function indicates an error,
-- // the underlying descriptor is closed.
-- boost::system::error_code close_ec;
-- m_dm_socket.close(close_ec);
-- if(close_ec) {
-- std::cout << "close: " << close_ec.message() << std::endl;
-- }
-- }
-- return -1;
-- }
--
-- std::cout<<"connect success"<<std::endl;
--
-- return 0;
-- }
--
-- int register_volume(std::string pool_name, std::string vol_name, uint64_t vol_size) {
-- // cache controller will init layout
-- rbdsc_req_type_t *message = new rbdsc_req_type_t();
-- message->type = RBDSC_REGISTER;
-- memcpy(message->pool_name, pool_name.c_str(), pool_name.size());
-- memcpy(message->vol_name, vol_name.c_str(), vol_name.size());
-- message->vol_size = vol_size;
-- message->offset = 0;
-- message->length = 0;
--
-- uint64_t ret;
-- boost::system::error_code ec;
--
-- ret = boost::asio::write(m_dm_socket, boost::asio::buffer((char*)message, message->size()), ec);
-- if(ec) {
-- std::cout << "write fails : " << ec.message() << std::endl;
-- return -1;
-- }
--
-- if(ret != message->size()) {
-- std::cout << "write fails : ret != send_bytes "<< std::endl;
-- return -1;
-- }
--
-- // hard code TODO
-- ret = boost::asio::read(m_dm_socket, boost::asio::buffer(m_recv_buffer, RBDSC_MSG_LEN), ec);
-- if(ec == boost::asio::error::eof) {
-- std::cout<< "recv eof"<<std::endl;
-- return -1;
-- }
--
-- if(ec) {
-- std::cout << "write fails : " << ec.message() << std::endl;
-- return -1;
-- }
--
-- if(ret != RBDSC_MSG_LEN) {
-- std::cout << "write fails : ret != receive bytes " << std::endl;
-- return -1;
-- }
--
-- m_client_process_msg(std::string(m_recv_buffer, ret));
--
-- delete message;
--
-- std::cout << "register volume success" << std::endl;
--
-- // TODO
-- m_session_work.store(true);
--
-- return 0;
-- }
--
-- // if occur any error, we just return false. Then read from rados.
-- int lookup_object(std::string pool_name, std::string vol_name, std::string object_id, Context* on_finish) {
-- rbdsc_req_type_t *message = new rbdsc_req_type_t();
-- message->type = RBDSC_READ;
-- memcpy(message->pool_name, pool_name.c_str(), pool_name.size());
-- memcpy(message->vol_name, object_id.c_str(), object_id.size());
-- message->vol_size = 0;
-- message->offset = 0;
-- message->length = 0;
--
-- boost::asio::async_write(m_dm_socket,
-- boost::asio::buffer((char*)message, message->size()),
-- boost::asio::transfer_exactly(RBDSC_MSG_LEN),
-- [this, on_finish, message](const boost::system::error_code& err, size_t cb) {
-- delete message;
-- if(err) {
-- std::cout<< "lookup_object: async_write fails." << err.message() << std::endl;
-- close();
-- on_finish->complete(false);
-- return;
-- }
-- if(cb != RBDSC_MSG_LEN) {
-- std::cout<< "lookup_object: async_write fails. in-complete request" <<std::endl;
-- close();
-- on_finish->complete(false);
-- return;
-- }
-- get_result(on_finish);
-- });
--
-- return 0;
-- }
--
-- void get_result(Context* on_finish) {
-- boost::asio::async_read(m_dm_socket, boost::asio::buffer(m_recv_buffer, RBDSC_MSG_LEN),
-- boost::asio::transfer_exactly(RBDSC_MSG_LEN),
-- [this, on_finish](const boost::system::error_code& err, size_t cb) {
-- if(err == boost::asio::error::eof) {
-- std::cout<<"get_result: ack is EOF." << std::endl;
-- close();
-- on_finish->complete(false);
-- return;
-- }
-- if(err) {
-- std::cout<< "get_result: async_read fails:" << err.message() << std::endl;
-- close();
-- on_finish->complete(false); // TODO replace this assert with some metohds.
-- return;
-- }
-- if (cb != RBDSC_MSG_LEN) {
-- close();
-- std::cout << "get_result: in-complete ack." << std::endl;
-- on_finish->complete(false); // TODO: replace this assert with some methods.
-- }
--
-- rbdsc_req_type_t *io_ctx = (rbdsc_req_type_t*)(m_recv_buffer);
--
-- // TODO: re-occur yuan's bug
-- if(io_ctx->type == RBDSC_READ) {
-- std::cout << "get rbdsc_read... " << std::endl;
-- assert(0);
-- }
--
-- if (io_ctx->type == RBDSC_READ_REPLY) {
-- on_finish->complete(true);
-- return;
-- } else {
-- on_finish->complete(false);
-- return;
-- }
-- });
-- }
--
--private:
-- boost::asio::io_service m_io_service;
-- boost::asio::io_service::work m_io_service_work;
-- stream_protocol::socket m_dm_socket;
-- ClientProcessMsg m_client_process_msg;
-- stream_protocol::endpoint m_ep;
-- char m_recv_buffer[1024];
--
-- // atomic modfiy for this variable.
-- // thread 1 : asio callback thread modify it.
-- // thread 2 : librbd read it.
-- std::atomic<bool> m_session_work;
-- CephContext* cct;
--};
--
--} // namespace cache
--} // namespace rbd
--#endif
-diff --git a/src/tools/rbd_cache/CacheControllerSocketCommon.h b/src/tools/rbd_cache/CacheControllerSocketCommon.h
-deleted file mode 100644
-index e17529a..0000000
---- a/src/tools/rbd_cache/CacheControllerSocketCommon.h
-+++ /dev/null
-@@ -1,62 +0,0 @@
--// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
--// vim: ts=8 sw=2 smarttab
--
--#ifndef CACHE_CONTROLLER_SOCKET_COMMON_H
--#define CACHE_CONTROLLER_SOCKET_COMMON_H
--
--/*
--#define RBDSC_REGISTER 0X11
--#define RBDSC_READ 0X12
--#define RBDSC_LOOKUP 0X13
--#define RBDSC_REGISTER_REPLY 0X14
--#define RBDSC_READ_REPLY 0X15
--#define RBDSC_LOOKUP_REPLY 0X16
--#define RBDSC_READ_RADOS 0X17
--*/
--
--namespace rbd {
--namespace cache {
--
--static const int RBDSC_REGISTER = 0X11;
--static const int RBDSC_READ = 0X12;
--static const int RBDSC_LOOKUP = 0X13;
--static const int RBDSC_REGISTER_REPLY = 0X14;
--static const int RBDSC_READ_REPLY = 0X15;
--static const int RBDSC_LOOKUP_REPLY = 0X16;
--static const int RBDSC_READ_RADOS = 0X17;
--
--
--
--typedef std::function<void(uint64_t, std::string)> ProcessMsg;
--typedef std::function<void(std::string)> ClientProcessMsg;
--typedef uint8_t rbdsc_req_type;
--struct rbdsc_req_type_t {
-- rbdsc_req_type type;
-- uint64_t vol_size;
-- uint64_t offset;
-- uint64_t length;
-- char pool_name[256];
-- char vol_name[256];
--
-- uint64_t size() {
-- return sizeof(rbdsc_req_type_t);
-- }
--
-- std::string to_buffer() {
-- std::stringstream ss;
-- ss << type;
-- ss << vol_size;
-- ss << offset;
-- ss << length;
-- ss << pool_name;
-- ss << vol_name;
--
-- return ss.str();
-- }
--};
--
--static const int RBDSC_MSG_LEN = sizeof(rbdsc_req_type_t);
--
--} // namespace cache
--} // namespace rbd
--#endif
-diff --git a/src/tools/rbd_cache/ObjectCacheStore.cc b/src/tools/rbd_cache/ObjectCacheStore.cc
-deleted file mode 100644
-index 99f90d6..0000000
---- a/src/tools/rbd_cache/ObjectCacheStore.cc
-+++ /dev/null
-@@ -1,172 +0,0 @@
--// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
--// vim: ts=8 sw=2 smarttab
--
--#include "ObjectCacheStore.h"
--
--#define dout_context g_ceph_context
--#define dout_subsys ceph_subsys_rbd_cache
--#undef dout_prefix
--#define dout_prefix *_dout << "rbd::cache::ObjectCacheStore: " << this << " " \
-- << __func__ << ": "
--
--namespace rbd {
--namespace cache {
--
--ObjectCacheStore::ObjectCacheStore(CephContext *cct, ContextWQ* work_queue)
-- : m_cct(cct), m_work_queue(work_queue),
-- m_rados(new librados::Rados()) {
--
-- uint64_t object_cache_entries =
-- cct->_conf.get_val<int64_t>("rbd_shared_cache_entries");
--
-- //TODO(): allow to set level
-- m_policy = new SimplePolicy(object_cache_entries, 0.5);
--}
--
--ObjectCacheStore::~ObjectCacheStore() {
-- delete m_policy;
--}
--
--int ObjectCacheStore::init(bool reset) {
--
-- int ret = m_rados->init_with_context(m_cct);
-- if(ret < 0) {
-- lderr(m_cct) << "fail to init Ceph context" << dendl;
-- return ret;
-- }
--
-- ret = m_rados->connect();
-- if(ret < 0 ) {
-- lderr(m_cct) << "fail to conect to cluster" << dendl;
-- return ret;
-- }
--
-- std::string cache_path = m_cct->_conf.get_val<std::string>("rbd_shared_cache_path");
-- //TODO(): check and reuse existing cache objects
-- if(reset) {
-- std::string cmd = "exec rm -rf " + cache_path + "/rbd_cache*; exec mkdir -p " + cache_path;
-- //TODO(): to use std::filesystem
-- int r = system(cmd.c_str());
-- }
--
-- evict_thd = new std::thread([this]{this->evict_thread_body();});
-- return ret;
--}
--
--int ObjectCacheStore::do_promote(std::string pool_name, std::string object_name) {
-- int ret = 0;
-- std::string cache_file_name = pool_name + object_name;
--
-- //TODO(): lock on ioctx map
-- if (m_ioctxs.find(pool_name) == m_ioctxs.end()) {
-- librados::IoCtx* io_ctx = new librados::IoCtx();
-- ret = m_rados->ioctx_create(pool_name.c_str(), *io_ctx);
-- if (ret < 0) {
-- lderr(m_cct) << "fail to create ioctx" << dendl;
-- assert(0);
-- }
-- m_ioctxs.emplace(pool_name, io_ctx);
-- }
--
-- assert(m_ioctxs.find(pool_name) != m_ioctxs.end());
--
-- librados::IoCtx* ioctx = m_ioctxs[pool_name];
--
-- librados::bufferlist* read_buf = new librados::bufferlist();
-- int object_size = 4096*1024; //TODO(): read config from image metadata
--
-- //TODO(): async promote
-- ret = promote_object(ioctx, object_name, read_buf, object_size);
-- if (ret == -ENOENT) {
-- read_buf->append(std::string(object_size, '0'));
-- ret = 0;
-- }
--
-- if( ret < 0) {
-- lderr(m_cct) << "fail to read from rados" << dendl;
-- return ret;
-- }
--
-- // persistent to cache
-- librbd::cache::SyncFile cache_file(m_cct, cache_file_name);
-- cache_file.open();
-- ret = cache_file.write_object_to_file(*read_buf, object_size);
--
-- // update metadata
-- assert(OBJ_CACHE_PROMOTING == m_policy->get_status(cache_file_name));
-- m_policy->update_status(cache_file_name, OBJ_CACHE_PROMOTED);
-- assert(OBJ_CACHE_PROMOTED == m_policy->get_status(cache_file_name));
--
-- return ret;
--
--}
--
--// return -1, client need to read data from cluster.
--// return 0, client directly read data from cache.
--int ObjectCacheStore::lookup_object(std::string pool_name, std::string object_name) {
--
-- std::string cache_file_name = pool_name + object_name;
--
-- CACHESTATUS ret;
-- ret = m_policy->lookup_object(cache_file_name);
--
-- switch(ret) {
-- case OBJ_CACHE_NONE:
-- return do_promote(pool_name, object_name);
-- case OBJ_CACHE_PROMOTED:
-- return 0;
-- case OBJ_CACHE_PROMOTING:
-- default:
-- return -1;
-- }
--}
--
--void ObjectCacheStore::evict_thread_body() {
-- int ret;
-- while(m_evict_go) {
-- ret = evict_objects();
-- }
--}
--
--
--int ObjectCacheStore::shutdown() {
-- m_evict_go = false;
-- evict_thd->join();
-- m_rados->shutdown();
-- return 0;
--}
--
--int ObjectCacheStore::init_cache(std::string vol_name, uint64_t vol_size) {
-- return 0;
--}
--
--int ObjectCacheStore::lock_cache(std::string vol_name) {
-- return 0;
--}
--
--int ObjectCacheStore::promote_object(librados::IoCtx* ioctx, std::string object_name, librados::bufferlist* read_buf, uint64_t read_len) {
-- int ret;
--
-- librados::AioCompletion* read_completion = librados::Rados::aio_create_completion();
--
-- ret = ioctx->aio_read(object_name, read_completion, read_buf, read_len, 0);
-- if(ret < 0) {
-- lderr(m_cct) << "fail to read from rados" << dendl;
-- return ret;
-- }
-- read_completion->wait_for_complete();
-- ret = read_completion->get_return_value();
-- return ret;
--
--}
--
--int ObjectCacheStore::evict_objects() {
-- std::list<std::string> obj_list;
-- m_policy->get_evict_list(&obj_list);
-- for (auto& obj: obj_list) {
-- //do_evict(obj);
-- }
--}
--
--} // namespace cache
--} // namespace rbd
-diff --git a/src/tools/rbd_cache/ObjectCacheStore.h b/src/tools/rbd_cache/ObjectCacheStore.h
-deleted file mode 100644
-index ba0e1f1..0000000
---- a/src/tools/rbd_cache/ObjectCacheStore.h
-+++ /dev/null
-@@ -1,70 +0,0 @@
--// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
--// vim: ts=8 sw=2 smarttab
--
--#ifndef OBJECT_CACHE_STORE_H
--#define OBJECT_CACHE_STORE_H
--
--#include "common/debug.h"
--#include "common/errno.h"
--#include "common/ceph_context.h"
--#include "common/Mutex.h"
--#include "include/rados/librados.hpp"
--#include "include/rbd/librbd.h"
--#include "librbd/ImageCtx.h"
--#include "librbd/ImageState.h"
--#include "librbd/cache/SharedPersistentObjectCacherFile.h"
--#include "SimplePolicy.hpp"
--
--
--using librados::Rados;
--using librados::IoCtx;
--
--namespace rbd {
--namespace cache {
--
--typedef shared_ptr<librados::Rados> RadosRef;
--typedef shared_ptr<librados::IoCtx> IoCtxRef;
--
--class ObjectCacheStore
--{
-- public:
-- ObjectCacheStore(CephContext *cct, ContextWQ* work_queue);
-- ~ObjectCacheStore();
--
-- int init(bool reset);
--
-- int shutdown();
--
-- int lookup_object(std::string pool_name, std::string object_name);
--
-- int init_cache(std::string vol_name, uint64_t vol_size);
--
-- int lock_cache(std::string vol_name);
--
-- private:
-- void evict_thread_body();
-- int evict_objects();
--
-- int do_promote(std::string pool_name, std::string object_name);
--
-- int promote_object(librados::IoCtx*, std::string object_name,
-- librados::bufferlist* read_buf,
-- uint64_t length);
--
-- CephContext *m_cct;
-- ContextWQ* m_work_queue;
-- RadosRef m_rados;
--
--
-- std::map<std::string, librados::IoCtx*> m_ioctxs;
--
-- librbd::cache::SyncFile *m_cache_file;
--
-- Policy* m_policy;
-- std::thread* evict_thd;
-- bool m_evict_go = false;
--};
--
--} // namespace rbd
--} // namespace cache
--#endif
-diff --git a/src/tools/rbd_cache/Policy.hpp b/src/tools/rbd_cache/Policy.hpp
-deleted file mode 100644
-index 711e3bd..0000000
---- a/src/tools/rbd_cache/Policy.hpp
-+++ /dev/null
-@@ -1,30 +0,0 @@
--#ifndef RBD_CACHE_POLICY_HPP
--#define RBD_CACHE_POLICY_HPP
--
--#include <list>
--#include <string>
--
--namespace rbd {
--namespace cache {
--
--enum CACHESTATUS {
-- OBJ_CACHE_NONE = 0,
-- OBJ_CACHE_PROMOTING,
-- OBJ_CACHE_PROMOTED,
--};
--
--
--class Policy {
--public:
-- Policy(){}
-- virtual ~Policy(){};
-- virtual CACHESTATUS lookup_object(std::string) = 0;
-- virtual int evict_object(std::string&) = 0;
-- virtual void update_status(std::string, CACHESTATUS) = 0;
-- virtual CACHESTATUS get_status(std::string) = 0;
-- virtual void get_evict_list(std::list<std::string>* obj_list) = 0;
--};
--
--} // namespace cache
--} // namespace rbd
--#endif
-diff --git a/src/tools/rbd_cache/SimplePolicy.hpp b/src/tools/rbd_cache/SimplePolicy.hpp
-deleted file mode 100644
-index e785de1..0000000
---- a/src/tools/rbd_cache/SimplePolicy.hpp
-+++ /dev/null
-@@ -1,160 +0,0 @@
--#ifndef RBD_CACHE_SIMPLE_POLICY_HPP
--#define RBD_CACHE_SIMPLE_POLICY_HPP
--
--#include "Policy.hpp"
--#include "include/lru.h"
--#include "common/RWLock.h"
--#include "common/Mutex.h"
--
--#include <vector>
--#include <unordered_map>
--#include <string>
--
--namespace rbd {
--namespace cache {
--
--
--class SimplePolicy : public Policy {
--public:
-- SimplePolicy(uint64_t block_num, float watermark)
-- : m_watermark(watermark), m_entry_count(block_num),
-- m_cache_map_lock("rbd::cache::SimplePolicy::m_cache_map_lock"),
-- m_free_list_lock("rbd::cache::SimplePolicy::m_free_list_lock")
-- {
--
-- for(uint64_t i = 0; i < m_entry_count; i++) {
-- m_free_list.push_back(new Entry());
-- }
--
-- }
--
-- ~SimplePolicy() {
-- for(uint64_t i = 0; i < m_entry_count; i++) {
-- Entry* entry = reinterpret_cast<Entry*>(m_free_list.front());
-- delete entry;
-- m_free_list.pop_front();
-- }
-- }
--
-- CACHESTATUS lookup_object(std::string cache_file_name) {
--
-- //TODO(): check race condition
-- RWLock::WLocker wlocker(m_cache_map_lock);
--
-- auto entry_it = m_cache_map.find(cache_file_name);
-- if(entry_it == m_cache_map.end()) {
-- Mutex::Locker locker(m_free_list_lock);
-- Entry* entry = reinterpret_cast<Entry*>(m_free_list.front());
-- assert(entry != nullptr);
-- m_free_list.pop_front();
-- entry->status = OBJ_CACHE_PROMOTING;
--
-- m_cache_map[cache_file_name] = entry;
--
-- return OBJ_CACHE_NONE;
-- }
--
-- Entry* entry = entry_it->second;
--
-- if(entry->status == OBJ_CACHE_PROMOTED) {
-- // touch it
-- m_promoted_lru.lru_touch(entry);
-- }
--
-- return entry->status;
-- }
--
-- int evict_object(std::string& out_cache_file_name) {
-- RWLock::WLocker locker(m_cache_map_lock);
--
-- return 1;
-- }
--
-- // TODO(): simplify the logic
-- void update_status(std::string file_name, CACHESTATUS new_status) {
-- RWLock::WLocker locker(m_cache_map_lock);
--
-- Entry* entry;
-- auto entry_it = m_cache_map.find(file_name);
--
-- // just check.
-- if(new_status == OBJ_CACHE_PROMOTING) {
-- assert(entry_it == m_cache_map.end());
-- }
--
-- assert(entry_it != m_cache_map.end());
--
-- entry = entry_it->second;
--
-- // promoting is done, so update it.
-- if(entry->status == OBJ_CACHE_PROMOTING && new_status== OBJ_CACHE_PROMOTED) {
-- m_promoted_lru.lru_insert_top(entry);
-- entry->status = new_status;
-- return;
-- }
--
-- assert(0);
-- }
--
-- // get entry status
-- CACHESTATUS get_status(std::string file_name) {
-- RWLock::RLocker locker(m_cache_map_lock);
-- auto entry_it = m_cache_map.find(file_name);
-- if(entry_it == m_cache_map.end()) {
-- return OBJ_CACHE_NONE;
-- }
--
-- return entry_it->second->status;
-- }
--
-- void get_evict_list(std::list<std::string>* obj_list) {
-- RWLock::WLocker locker(m_cache_map_lock);
-- // check free ratio, pop entries from LRU
-- if (m_free_list.size() / m_entry_count < m_watermark) {
-- int evict_num = 10; //TODO(): make this configurable
-- for(int i = 0; i < evict_num; i++) {
-- Entry* entry = reinterpret_cast<Entry*>(m_promoted_lru.lru_expire());
-- if (entry == nullptr) {
-- continue;
-- }
-- std::string file_name = entry->cache_file_name;
-- obj_list->push_back(file_name);
--
-- auto entry_it = m_cache_map.find(file_name);
-- m_cache_map.erase(entry_it);
--
-- //mark this entry as free
-- entry->status = OBJ_CACHE_NONE;
-- Mutex::Locker locker(m_free_list_lock);
-- m_free_list.push_back(entry);
-- }
-- }
-- }
--
--private:
--
-- class Entry : public LRUObject {
-- public:
-- CACHESTATUS status;
-- Entry() : status(OBJ_CACHE_NONE){}
-- std::string cache_file_name;
-- void encode(bufferlist &bl){}
-- void decode(bufferlist::iterator &it){}
-- };
--
-- float m_watermark;
-- uint64_t m_entry_count;
--
-- std::unordered_map<std::string, Entry*> m_cache_map;
-- RWLock m_cache_map_lock;
--
-- std::deque<Entry*> m_free_list;
-- Mutex m_free_list_lock;
--
-- LRU m_promoted_lru; // include promoted, using status.
--
--};
--
--} // namespace cache
--} // namespace rbd
--#endif
-diff --git a/src/tools/rbd_cache/main.cc b/src/tools/rbd_cache/main.cc
-deleted file mode 100644
-index d604760..0000000
---- a/src/tools/rbd_cache/main.cc
-+++ /dev/null
-@@ -1,85 +0,0 @@
--// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
--// vim: ts=8 sw=2 smarttab
--
--#include "common/ceph_argparse.h"
--#include "common/config.h"
--#include "common/debug.h"
--#include "common/errno.h"
--#include "global/global_init.h"
--#include "global/signal_handler.h"
--#include "CacheController.h"
--
--#include <vector>
--
--rbd::cache::CacheController *cachectl = nullptr;
--
--void usage() {
-- std::cout << "usage: cache controller [options...]" << std::endl;
-- std::cout << "options:\n";
-- std::cout << " -m monaddress[:port] connect to specified monitor\n";
-- std::cout << " --keyring=<path> path to keyring for local cluster\n";
-- std::cout << " --log-file=<logfile> file to log debug output\n";
-- std::cout << " --debug-rbd-cachecontroller=<log-level>/<memory-level> set rbd-mirror debug level\n";
-- generic_server_usage();
--}
--
--static void handle_signal(int signum)
--{
-- if (cachectl)
-- cachectl->handle_signal(signum);
--}
--
--int main(int argc, const char **argv)
--{
-- std::vector<const char*> args;
-- env_to_vec(args);
-- argv_to_vec(argc, argv, args);
--
-- auto cct = global_init(nullptr, args, CEPH_ENTITY_TYPE_CLIENT,
-- CODE_ENVIRONMENT_DAEMON,
-- CINIT_FLAG_UNPRIVILEGED_DAEMON_DEFAULTS);
--
-- for (auto i = args.begin(); i != args.end(); ++i) {
-- if (ceph_argparse_flag(args, i, "-h", "--help", (char*)NULL)) {
-- usage();
-- return EXIT_SUCCESS;
-- }
-- }
--
-- if (g_conf()->daemonize) {
-- global_init_daemonize(g_ceph_context);
-- }
-- g_ceph_context->enable_perf_counter();
--
-- common_init_finish(g_ceph_context);
--
-- init_async_signal_handler();
-- register_async_signal_handler(SIGHUP, sighup_handler);
-- register_async_signal_handler_oneshot(SIGINT, handle_signal);
-- register_async_signal_handler_oneshot(SIGTERM, handle_signal);
--
-- std::vector<const char*> cmd_args;
-- argv_to_vec(argc, argv, cmd_args);
--
-- // disable unnecessary librbd cache
-- g_ceph_context->_conf.set_val_or_die("rbd_cache", "false");
--
-- cachectl = new rbd::cache::CacheController(g_ceph_context, cmd_args);
-- int r = cachectl->init();
-- if (r < 0) {
-- std::cerr << "failed to initialize: " << cpp_strerror(r) << std::endl;
-- goto cleanup;
-- }
--
-- cachectl->run();
--
-- cleanup:
-- unregister_async_signal_handler(SIGHUP, sighup_handler);
-- unregister_async_signal_handler(SIGINT, handle_signal);
-- unregister_async_signal_handler(SIGTERM, handle_signal);
-- shutdown_async_signal_handler();
--
-- delete cachectl;
--
-- return r < 0 ? EXIT_SUCCESS : EXIT_FAILURE;
--}
---
-2.7.4
-