summaryrefslogtreecommitdiffstats
path: root/src/ceph/0010-librbd-new-namespace-ceph-immutable-obj-cache.patch
diff options
context:
space:
mode:
Diffstat (limited to 'src/ceph/0010-librbd-new-namespace-ceph-immutable-obj-cache.patch')
-rw-r--r--src/ceph/0010-librbd-new-namespace-ceph-immutable-obj-cache.patch3510
1 files changed, 3510 insertions, 0 deletions
diff --git a/src/ceph/0010-librbd-new-namespace-ceph-immutable-obj-cache.patch b/src/ceph/0010-librbd-new-namespace-ceph-immutable-obj-cache.patch
new file mode 100644
index 0000000..46040e6
--- /dev/null
+++ b/src/ceph/0010-librbd-new-namespace-ceph-immutable-obj-cache.patch
@@ -0,0 +1,3510 @@
+From 9a097084d06e7186206b43d9c81d1f648791d7a4 Mon Sep 17 00:00:00 2001
+From: Yuan Zhou <yuan.zhou@intel.com>
+Date: Fri, 7 Sep 2018 08:29:51 +0800
+Subject: [PATCH 10/10] librbd: new namespace ceph immutable obj cache
+
+clean up class/func names to use the new namespace
+
+Signed-off-by: Yuan Zhou <yuan.zhou@intel.com>
+---
+ src/common/options.cc | 2 +-
+ src/common/subsys.h | 3 +-
+ src/librbd/CMakeLists.txt | 3 +-
+ .../SharedPersistentObjectCacherObjectDispatch.cc | 175 ----------------
+ .../SharedPersistentObjectCacherObjectDispatch.h | 133 ------------
+ src/librbd/cache/SharedReadOnlyObjectDispatch.cc | 170 +++++++++++++++
+ src/librbd/cache/SharedReadOnlyObjectDispatch.h | 126 ++++++++++++
+ src/librbd/image/OpenRequest.cc | 4 +-
+ src/tools/CMakeLists.txt | 2 +-
+ .../ceph_immutable_object_cache/CMakeLists.txt | 11 +
+ .../ceph_immutable_object_cache/CacheClient.cc | 205 ++++++++++++++++++
+ .../ceph_immutable_object_cache/CacheClient.h | 53 +++++
+ .../ceph_immutable_object_cache/CacheController.cc | 117 +++++++++++
+ .../ceph_immutable_object_cache/CacheController.h | 53 +++++
+ .../ceph_immutable_object_cache/CacheServer.cc | 99 +++++++++
+ .../ceph_immutable_object_cache/CacheServer.h | 54 +++++
+ .../ceph_immutable_object_cache/CacheSession.cc | 115 +++++++++++
+ .../ceph_immutable_object_cache/CacheSession.h | 58 ++++++
+ .../ObjectCacheStore.cc | 172 ++++++++++++++++
+ .../ceph_immutable_object_cache/ObjectCacheStore.h | 70 +++++++
+ src/tools/ceph_immutable_object_cache/Policy.hpp | 33 +++
+ .../ceph_immutable_object_cache/SimplePolicy.hpp | 163 +++++++++++++++
+ .../ceph_immutable_object_cache/SocketCommon.h | 54 +++++
+ src/tools/ceph_immutable_object_cache/main.cc | 85 ++++++++
+ src/tools/rbd_cache/CMakeLists.txt | 9 -
+ src/tools/rbd_cache/CacheController.cc | 116 -----------
+ src/tools/rbd_cache/CacheController.h | 54 -----
+ src/tools/rbd_cache/CacheControllerSocket.hpp | 228 --------------------
+ .../rbd_cache/CacheControllerSocketClient.hpp | 229 ---------------------
+ src/tools/rbd_cache/CacheControllerSocketCommon.h | 62 ------
+ src/tools/rbd_cache/ObjectCacheStore.cc | 172 ----------------
+ src/tools/rbd_cache/ObjectCacheStore.h | 70 -------
+ src/tools/rbd_cache/Policy.hpp | 30 ---
+ src/tools/rbd_cache/SimplePolicy.hpp | 160 --------------
+ src/tools/rbd_cache/main.cc | 85 --------
+ 35 files changed, 1646 insertions(+), 1529 deletions(-)
+ delete mode 100644 src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc
+ delete mode 100644 src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.h
+ create mode 100644 src/librbd/cache/SharedReadOnlyObjectDispatch.cc
+ create mode 100644 src/librbd/cache/SharedReadOnlyObjectDispatch.h
+ create mode 100644 src/tools/ceph_immutable_object_cache/CMakeLists.txt
+ create mode 100644 src/tools/ceph_immutable_object_cache/CacheClient.cc
+ create mode 100644 src/tools/ceph_immutable_object_cache/CacheClient.h
+ create mode 100644 src/tools/ceph_immutable_object_cache/CacheController.cc
+ create mode 100644 src/tools/ceph_immutable_object_cache/CacheController.h
+ create mode 100644 src/tools/ceph_immutable_object_cache/CacheServer.cc
+ create mode 100644 src/tools/ceph_immutable_object_cache/CacheServer.h
+ create mode 100644 src/tools/ceph_immutable_object_cache/CacheSession.cc
+ create mode 100644 src/tools/ceph_immutable_object_cache/CacheSession.h
+ create mode 100644 src/tools/ceph_immutable_object_cache/ObjectCacheStore.cc
+ create mode 100644 src/tools/ceph_immutable_object_cache/ObjectCacheStore.h
+ create mode 100644 src/tools/ceph_immutable_object_cache/Policy.hpp
+ create mode 100644 src/tools/ceph_immutable_object_cache/SimplePolicy.hpp
+ create mode 100644 src/tools/ceph_immutable_object_cache/SocketCommon.h
+ create mode 100644 src/tools/ceph_immutable_object_cache/main.cc
+ delete mode 100644 src/tools/rbd_cache/CMakeLists.txt
+ delete mode 100644 src/tools/rbd_cache/CacheController.cc
+ delete mode 100644 src/tools/rbd_cache/CacheController.h
+ delete mode 100644 src/tools/rbd_cache/CacheControllerSocket.hpp
+ delete mode 100644 src/tools/rbd_cache/CacheControllerSocketClient.hpp
+ delete mode 100644 src/tools/rbd_cache/CacheControllerSocketCommon.h
+ delete mode 100644 src/tools/rbd_cache/ObjectCacheStore.cc
+ delete mode 100644 src/tools/rbd_cache/ObjectCacheStore.h
+ delete mode 100644 src/tools/rbd_cache/Policy.hpp
+ delete mode 100644 src/tools/rbd_cache/SimplePolicy.hpp
+ delete mode 100644 src/tools/rbd_cache/main.cc
+
+diff --git a/src/common/options.cc b/src/common/options.cc
+index 3172744..bf00aab 100644
+--- a/src/common/options.cc
++++ b/src/common/options.cc
+@@ -6358,7 +6358,7 @@ static std::vector<Option> get_rbd_options() {
+ .set_description("time in seconds for detecting a hung thread"),
+
+ Option("rbd_shared_cache_enabled", Option::TYPE_BOOL, Option::LEVEL_ADVANCED)
+- .set_default(true)
++ .set_default(false)
+ .set_description("whether to enable shared ssd caching"),
+
+ Option("rbd_shared_cache_path", Option::TYPE_STR, Option::LEVEL_ADVANCED)
+diff --git a/src/common/subsys.h b/src/common/subsys.h
+index bdd2d0e..5b532c1 100644
+--- a/src/common/subsys.h
++++ b/src/common/subsys.h
+@@ -36,9 +36,10 @@ SUBSYS(objecter, 0, 1)
+ SUBSYS(rados, 0, 5)
+ SUBSYS(rbd, 0, 5)
+ SUBSYS(rbd_mirror, 0, 5)
+-SUBSYS(rbd_replay, 0, 5)
+ SUBSYS(journaler, 0, 5)
+ SUBSYS(objectcacher, 0, 5)
++SUBSYS(immutable_obj_cache, 0, 5)
++SUBSYS(rbd_replay, 0, 5)
+ SUBSYS(client, 0, 5)
+ SUBSYS(osd, 1, 5)
+ SUBSYS(optracker, 0, 5)
+diff --git a/src/librbd/CMakeLists.txt b/src/librbd/CMakeLists.txt
+index 540ee78..c9bfb6f 100644
+--- a/src/librbd/CMakeLists.txt
++++ b/src/librbd/CMakeLists.txt
+@@ -32,7 +32,7 @@ set(librbd_internal_srcs
+ api/Snapshot.cc
+ cache/ImageWriteback.cc
+ cache/ObjectCacherObjectDispatch.cc
+- cache/SharedPersistentObjectCacherObjectDispatch.cc
++ cache/SharedReadOnlyObjectDispatch.cc
+ cache/SharedPersistentObjectCacher.cc
+ cache/SharedPersistentObjectCacherFile.cc
+ deep_copy/ImageCopyRequest.cc
+@@ -125,6 +125,7 @@ set(librbd_internal_srcs
+ trash/MoveRequest.cc
+ watcher/Notifier.cc
+ watcher/RewatchRequest.cc
++ ${CMAKE_SOURCE_DIR}/src/tools/ceph_immutable_object_cache/CacheClient.cc
+ ${CMAKE_SOURCE_DIR}/src/common/ContextCompletion.cc)
+
+ add_library(rbd_api STATIC librbd.cc)
+diff --git a/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc b/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc
+deleted file mode 100644
+index 7cbc019..0000000
+--- a/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc
++++ /dev/null
+@@ -1,175 +0,0 @@
+-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+-// vim: ts=8 sw=2 smarttab
+-
+-#include "librbd/cache/SharedPersistentObjectCacherObjectDispatch.h"
+-#include "common/WorkQueue.h"
+-#include "librbd/ImageCtx.h"
+-#include "librbd/Journal.h"
+-#include "librbd/Utils.h"
+-#include "librbd/LibrbdWriteback.h"
+-#include "librbd/io/ObjectDispatchSpec.h"
+-#include "librbd/io/ObjectDispatcher.h"
+-#include "librbd/io/Utils.h"
+-#include "osd/osd_types.h"
+-#include "osdc/WritebackHandler.h"
+-#include <vector>
+-
+-#define dout_subsys ceph_subsys_rbd
+-#undef dout_prefix
+-#define dout_prefix *_dout << "librbd::cache::SharedPersistentObjectCacherObjectDispatch: " \
+- << this << " " << __func__ << ": "
+-
+-namespace librbd {
+-namespace cache {
+-
+-template <typename I>
+-SharedPersistentObjectCacherObjectDispatch<I>::SharedPersistentObjectCacherObjectDispatch(
+- I* image_ctx) : m_image_ctx(image_ctx) {
+-}
+-
+-template <typename I>
+-SharedPersistentObjectCacherObjectDispatch<I>::~SharedPersistentObjectCacherObjectDispatch() {
+- delete m_object_store;
+- delete m_cache_client;
+-}
+-
+-// TODO if connect fails, init will return error to high layer.
+-template <typename I>
+-void SharedPersistentObjectCacherObjectDispatch<I>::init() {
+- auto cct = m_image_ctx->cct;
+- ldout(cct, 5) << dendl;
+-
+- if (m_image_ctx->parent != nullptr) {
+- //TODO(): should we cover multi-leveled clone?
+- ldout(cct, 5) << "child image: skipping SRO cache client" << dendl;
+- return;
+- }
+-
+- ldout(cct, 5) << "parent image: setup SRO cache client = " << dendl;
+-
+- std::string controller_path = ((CephContext*)cct)->_conf.get_val<std::string>("rbd_shared_cache_sock");
+- m_cache_client = new rbd::cache::CacheClient(controller_path.c_str(),
+- ([&](std::string s){client_handle_request(s);}), m_image_ctx->cct);
+-
+- int ret = m_cache_client->connect();
+- if (ret < 0) {
+- ldout(cct, 5) << "SRO cache client fail to connect with local controller: "
+- << "please start rbd-cache daemon"
+- << dendl;
+- } else {
+- ldout(cct, 5) << "SRO cache client to register volume on rbd-cache daemon: "
+- << "name = " << m_image_ctx->id
+- << dendl;
+-
+- ret = m_cache_client->register_volume(m_image_ctx->data_ctx.get_pool_name(),
+- m_image_ctx->id, m_image_ctx->size);
+-
+- if (ret >= 0) {
+- // add ourself to the IO object dispatcher chain
+- m_image_ctx->io_object_dispatcher->register_object_dispatch(this);
+- }
+- }
+-}
+-
+-template <typename I>
+-bool SharedPersistentObjectCacherObjectDispatch<I>::read(
+- const std::string &oid, uint64_t object_no, uint64_t object_off,
+- uint64_t object_len, librados::snap_t snap_id, int op_flags,
+- const ZTracer::Trace &parent_trace, ceph::bufferlist* read_data,
+- io::ExtentMap* extent_map, int* object_dispatch_flags,
+- io::DispatchResult* dispatch_result, Context** on_finish,
+- Context* on_dispatched) {
+-
+- // IO chained in reverse order
+-
+- // Now, policy is : when session have any error, later all read will dispatched to rados layer.
+- if(!m_cache_client->is_session_work()) {
+- *dispatch_result = io::DISPATCH_RESULT_CONTINUE;
+- on_dispatched->complete(0);
+- return true;
+- // TODO : domain socket have error, all read operation will dispatched to rados layer.
+- }
+-
+- auto cct = m_image_ctx->cct;
+- ldout(cct, 20) << "object_no=" << object_no << " " << object_off << "~"
+- << object_len << dendl;
+-
+-
+- on_dispatched = util::create_async_context_callback(*m_image_ctx,
+- on_dispatched);
+- auto ctx = new FunctionContext([this, oid, object_off, object_len, read_data, dispatch_result, on_dispatched](bool cache) {
+- handle_read_cache(cache, oid, object_off, object_len, read_data, dispatch_result, on_dispatched);
+- });
+-
+- if (m_cache_client && m_cache_client->is_session_work() && m_object_store) {
+- m_cache_client->lookup_object(m_image_ctx->data_ctx.get_pool_name(),
+- m_image_ctx->id, oid, ctx);
+- }
+- return true;
+-}
+-
+-template <typename I>
+-int SharedPersistentObjectCacherObjectDispatch<I>::handle_read_cache(
+- bool cache,
+- const std::string &oid, uint64_t object_off, uint64_t object_len,
+- ceph::bufferlist* read_data, io::DispatchResult* dispatch_result,
+- Context* on_dispatched) {
+- // IO chained in reverse order
+- auto cct = m_image_ctx->cct;
+- ldout(cct, 20) << dendl;
+-
+- // try to read from parent image
+- if (cache) {
+- int r = m_object_store->read_object(oid, read_data, object_off, object_len, on_dispatched);
+- //int r = object_len;
+- if (r != 0) {
+- *dispatch_result = io::DISPATCH_RESULT_COMPLETE;
+- //TODO(): complete in syncfile
+- on_dispatched->complete(r);
+- ldout(cct, 20) << "AAAAcomplete=" << *dispatch_result <<dendl;
+- return true;
+- }
+- } else {
+- *dispatch_result = io::DISPATCH_RESULT_CONTINUE;
+- on_dispatched->complete(0);
+- ldout(cct, 20) << "BBB no cache" << *dispatch_result <<dendl;
+- return false;
+- }
+-}
+-template <typename I>
+-void SharedPersistentObjectCacherObjectDispatch<I>::client_handle_request(std::string msg) {
+- auto cct = m_image_ctx->cct;
+- ldout(cct, 20) << dendl;
+-
+- rbd::cache::rbdsc_req_type_t *io_ctx = (rbd::cache::rbdsc_req_type_t*)(msg.c_str());
+-
+- switch (io_ctx->type) {
+- case rbd::cache::RBDSC_REGISTER_REPLY: {
+- // open cache handler for volume
+- ldout(cct, 20) << "SRO cache client open cache handler" << dendl;
+- m_object_store = new SharedPersistentObjectCacher<I>(m_image_ctx, m_image_ctx->shared_cache_path);
+-
+- break;
+- }
+- case rbd::cache::RBDSC_READ_REPLY: {
+- ldout(cct, 20) << "SRO cache client start to read cache" << dendl;
+- //TODO(): should call read here
+-
+- break;
+- }
+- case rbd::cache::RBDSC_READ_RADOS: {
+- ldout(cct, 20) << "SRO cache client start to read rados" << dendl;
+- //TODO(): should call read here
+-
+- break;
+- }
+- default: ldout(cct, 20) << "nothing" << io_ctx->type <<dendl;
+- break;
+-
+- }
+-}
+-
+-} // namespace cache
+-} // namespace librbd
+-
+-template class librbd::cache::SharedPersistentObjectCacherObjectDispatch<librbd::ImageCtx>;
+diff --git a/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.h b/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.h
+deleted file mode 100644
+index 5685244..0000000
+--- a/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.h
++++ /dev/null
+@@ -1,133 +0,0 @@
+-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+-// vim: ts=8 sw=2 smarttab
+-
+-#ifndef CEPH_LIBRBD_CACHE_SHARED_PERSISTENT_OBJECT_CACHER_OBJECT_DISPATCH_H
+-#define CEPH_LIBRBD_CACHE_SHARED_PERSISTENT_OBJECT_CACHER_OBJECT_DISPATCH_H
+-
+-#include "librbd/io/ObjectDispatchInterface.h"
+-#include "common/Mutex.h"
+-#include "osdc/ObjectCacher.h"
+-#include "tools/rbd_cache/CacheControllerSocketClient.hpp"
+-#include "SharedPersistentObjectCacher.h"
+-
+-struct WritebackHandler;
+-
+-namespace librbd {
+-
+-class ImageCtx;
+-
+-namespace cache {
+-
+-/**
+- * Facade around the OSDC object cacher to make it align with
+- * the object dispatcher interface
+- */
+-template <typename ImageCtxT = ImageCtx>
+-class SharedPersistentObjectCacherObjectDispatch : public io::ObjectDispatchInterface {
+-public:
+- static SharedPersistentObjectCacherObjectDispatch* create(ImageCtxT* image_ctx) {
+- return new SharedPersistentObjectCacherObjectDispatch(image_ctx);
+- }
+-
+- SharedPersistentObjectCacherObjectDispatch(ImageCtxT* image_ctx);
+- ~SharedPersistentObjectCacherObjectDispatch() override;
+-
+- io::ObjectDispatchLayer get_object_dispatch_layer() const override {
+- return io::OBJECT_DISPATCH_LAYER_SHARED_PERSISTENT_CACHE;
+- }
+-
+- void init();
+- void shut_down(Context* on_finish) {
+- m_image_ctx->op_work_queue->queue(on_finish, 0);
+- }
+-
+- bool read(
+- const std::string &oid, uint64_t object_no, uint64_t object_off,
+- uint64_t object_len, librados::snap_t snap_id, int op_flags,
+- const ZTracer::Trace &parent_trace, ceph::bufferlist* read_data,
+- io::ExtentMap* extent_map, int* object_dispatch_flags,
+- io::DispatchResult* dispatch_result, Context** on_finish,
+- Context* on_dispatched) override;
+-
+- bool discard(
+- const std::string &oid, uint64_t object_no, uint64_t object_off,
+- uint64_t object_len, const ::SnapContext &snapc, int discard_flags,
+- const ZTracer::Trace &parent_trace, int* object_dispatch_flags,
+- uint64_t* journal_tid, io::DispatchResult* dispatch_result,
+- Context** on_finish, Context* on_dispatched) {
+- return false;
+- }
+-
+- bool write(
+- const std::string &oid, uint64_t object_no, uint64_t object_off,
+- ceph::bufferlist&& data, const ::SnapContext &snapc, int op_flags,
+- const ZTracer::Trace &parent_trace, int* object_dispatch_flags,
+- uint64_t* journal_tid, io::DispatchResult* dispatch_result,
+- Context** on_finish, Context* on_dispatched) {
+- return false;
+- }
+-
+- bool write_same(
+- const std::string &oid, uint64_t object_no, uint64_t object_off,
+- uint64_t object_len, io::Extents&& buffer_extents,
+- ceph::bufferlist&& data, const ::SnapContext &snapc, int op_flags,
+- const ZTracer::Trace &parent_trace, int* object_dispatch_flags,
+- uint64_t* journal_tid, io::DispatchResult* dispatch_result,
+- Context** on_finish, Context* on_dispatched) {
+- return false;
+- }
+-
+- bool compare_and_write(
+- const std::string &oid, uint64_t object_no, uint64_t object_off,
+- ceph::bufferlist&& cmp_data, ceph::bufferlist&& write_data,
+- const ::SnapContext &snapc, int op_flags,
+- const ZTracer::Trace &parent_trace, uint64_t* mismatch_offset,
+- int* object_dispatch_flags, uint64_t* journal_tid,
+- io::DispatchResult* dispatch_result, Context** on_finish,
+- Context* on_dispatched) {
+- return false;
+- }
+-
+- bool flush(
+- io::FlushSource flush_source, const ZTracer::Trace &parent_trace,
+- io::DispatchResult* dispatch_result, Context** on_finish,
+- Context* on_dispatched) {
+- return false;
+- }
+-
+- bool invalidate_cache(Context* on_finish) {
+- return false;
+- }
+-
+- bool reset_existence_cache(Context* on_finish) {
+- return false;
+- }
+-
+- void extent_overwritten(
+- uint64_t object_no, uint64_t object_off, uint64_t object_len,
+- uint64_t journal_tid, uint64_t new_journal_tid) {
+- }
+-
+- SharedPersistentObjectCacher<ImageCtxT> *m_object_store = nullptr;
+-
+-private:
+-
+- int handle_read_cache(
+- bool cache,
+- const std::string &oid, uint64_t object_off,
+- uint64_t object_len, ceph::bufferlist* read_data,
+- io::DispatchResult* dispatch_result,
+- Context* on_dispatched);
+- void client_handle_request(std::string msg);
+-
+- ImageCtxT* m_image_ctx;
+-
+- rbd::cache::CacheClient *m_cache_client = nullptr;
+-};
+-
+-} // namespace cache
+-} // namespace librbd
+-
+-extern template class librbd::cache::SharedPersistentObjectCacherObjectDispatch<librbd::ImageCtx>;
+-
+-#endif // CEPH_LIBRBD_CACHE_OBJECT_CACHER_OBJECT_DISPATCH_H
+diff --git a/src/librbd/cache/SharedReadOnlyObjectDispatch.cc b/src/librbd/cache/SharedReadOnlyObjectDispatch.cc
+new file mode 100644
+index 0000000..23c7dbe
+--- /dev/null
++++ b/src/librbd/cache/SharedReadOnlyObjectDispatch.cc
+@@ -0,0 +1,170 @@
++// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
++// vim: ts=8 sw=2 smarttab
++
++#include "common/WorkQueue.h"
++#include "librbd/ImageCtx.h"
++#include "librbd/Journal.h"
++#include "librbd/Utils.h"
++#include "librbd/LibrbdWriteback.h"
++#include "librbd/io/ObjectDispatchSpec.h"
++#include "librbd/io/ObjectDispatcher.h"
++#include "librbd/io/Utils.h"
++#include "librbd/cache/SharedReadOnlyObjectDispatch.h"
++#include "osd/osd_types.h"
++#include "osdc/WritebackHandler.h"
++
++#include <vector>
++
++#define dout_subsys ceph_subsys_rbd
++#undef dout_prefix
++#define dout_prefix *_dout << "librbd::cache::SharedReadOnlyObjectDispatch: " \
++ << this << " " << __func__ << ": "
++
++namespace librbd {
++namespace cache {
++
++template <typename I>
++SharedReadOnlyObjectDispatch<I>::SharedReadOnlyObjectDispatch(
++ I* image_ctx) : m_image_ctx(image_ctx) {
++}
++
++template <typename I>
++SharedReadOnlyObjectDispatch<I>::~SharedReadOnlyObjectDispatch() {
++ delete m_object_store;
++ delete m_cache_client;
++}
++
++// TODO if connect fails, init will return error to high layer.
++template <typename I>
++void SharedReadOnlyObjectDispatch<I>::init() {
++ auto cct = m_image_ctx->cct;
++ ldout(cct, 5) << dendl;
++
++ if (m_image_ctx->parent != nullptr) {
++ //TODO(): should we cover multi-leveled clone?
++ ldout(cct, 5) << "child image: skipping SRO cache client" << dendl;
++ return;
++ }
++
++ ldout(cct, 5) << "parent image: setup SRO cache client = " << dendl;
++
++ std::string controller_path = ((CephContext*)cct)->_conf.get_val<std::string>("rbd_shared_cache_sock");
++ m_cache_client = new ceph::immutable_obj_cache::CacheClient(controller_path.c_str(),
++ ([&](std::string s){client_handle_request(s);}), m_image_ctx->cct);
++
++ int ret = m_cache_client->connect();
++ if (ret < 0) {
++ ldout(cct, 5) << "SRO cache client fail to connect with local controller: "
++ << "please start rbd-cache daemon"
++ << dendl;
++ } else {
++ ldout(cct, 5) << "SRO cache client to register volume on rbd-cache daemon: "
++ << "name = " << m_image_ctx->id
++ << dendl;
++
++ ret = m_cache_client->register_volume(m_image_ctx->data_ctx.get_pool_name(),
++ m_image_ctx->id, m_image_ctx->size);
++
++ if (ret >= 0) {
++ // add ourself to the IO object dispatcher chain
++ m_image_ctx->io_object_dispatcher->register_object_dispatch(this);
++ }
++ }
++}
++
++template <typename I>
++bool SharedReadOnlyObjectDispatch<I>::read(
++ const std::string &oid, uint64_t object_no, uint64_t object_off,
++ uint64_t object_len, librados::snap_t snap_id, int op_flags,
++ const ZTracer::Trace &parent_trace, ceph::bufferlist* read_data,
++ io::ExtentMap* extent_map, int* object_dispatch_flags,
++ io::DispatchResult* dispatch_result, Context** on_finish,
++ Context* on_dispatched) {
++ auto cct = m_image_ctx->cct;
++ ldout(cct, 20) << "object_no=" << object_no << " " << object_off << "~"
++ << object_len << dendl;
++
++ // if any session fails, later reads will go to rados
++ if(!m_cache_client->is_session_work()) {
++ *dispatch_result = io::DISPATCH_RESULT_CONTINUE;
++ on_dispatched->complete(0);
++ return true;
++ // TODO(): fix domain socket error
++ }
++
++ auto ctx = new FunctionContext([this, oid, object_off, object_len,
++ read_data, dispatch_result, on_dispatched](bool cache) {
++ handle_read_cache(cache, oid, object_off, object_len,
++ read_data, dispatch_result, on_dispatched);
++ });
++
++ if (m_cache_client && m_cache_client->is_session_work() && m_object_store) {
++ m_cache_client->lookup_object(m_image_ctx->data_ctx.get_pool_name(),
++ m_image_ctx->id, oid, ctx);
++ }
++ return true;
++}
++
++template <typename I>
++int SharedReadOnlyObjectDispatch<I>::handle_read_cache(
++ bool cache, const std::string &oid, uint64_t object_off,
++ uint64_t object_len, ceph::bufferlist* read_data,
++ io::DispatchResult* dispatch_result, Context* on_dispatched) {
++ auto cct = m_image_ctx->cct;
++ ldout(cct, 20) << dendl;
++
++ // try to read from parent image
++ if (cache) {
++ int r = m_object_store->read_object(oid, read_data, object_off, object_len, on_dispatched);
++ //int r = object_len;
++ if (r != 0) {
++ *dispatch_result = io::DISPATCH_RESULT_COMPLETE;
++ //TODO(): complete in syncfile
++ on_dispatched->complete(r);
++ ldout(cct, 20) << "read cache: " << *dispatch_result <<dendl;
++ return true;
++ }
++ } else {
++ *dispatch_result = io::DISPATCH_RESULT_CONTINUE;
++ on_dispatched->complete(0);
++ ldout(cct, 20) << "read rados: " << *dispatch_result <<dendl;
++ return false;
++ }
++}
++template <typename I>
++void SharedReadOnlyObjectDispatch<I>::client_handle_request(std::string msg) {
++ auto cct = m_image_ctx->cct;
++ ldout(cct, 20) << dendl;
++
++ ceph::immutable_obj_cache::rbdsc_req_type_t *io_ctx = (ceph::immutable_obj_cache::rbdsc_req_type_t*)(msg.c_str());
++
++ switch (io_ctx->type) {
++ case ceph::immutable_obj_cache::RBDSC_REGISTER_REPLY: {
++ // open cache handler for volume
++ ldout(cct, 20) << "SRO cache client open cache handler" << dendl;
++ m_object_store = new SharedPersistentObjectCacher<I>(m_image_ctx, m_image_ctx->shared_cache_path);
++
++ break;
++ }
++ case ceph::immutable_obj_cache::RBDSC_READ_REPLY: {
++ ldout(cct, 20) << "SRO cache client start to read cache" << dendl;
++ //TODO(): should call read here
++
++ break;
++ }
++ case ceph::immutable_obj_cache::RBDSC_READ_RADOS: {
++ ldout(cct, 20) << "SRO cache client start to read rados" << dendl;
++ //TODO(): should call read here
++
++ break;
++ }
++ default: ldout(cct, 20) << "nothing" << io_ctx->type <<dendl;
++ break;
++
++ }
++}
++
++} // namespace cache
++} // namespace librbd
++
++template class librbd::cache::SharedReadOnlyObjectDispatch<librbd::ImageCtx>;
+diff --git a/src/librbd/cache/SharedReadOnlyObjectDispatch.h b/src/librbd/cache/SharedReadOnlyObjectDispatch.h
+new file mode 100644
+index 0000000..9b56da9
+--- /dev/null
++++ b/src/librbd/cache/SharedReadOnlyObjectDispatch.h
+@@ -0,0 +1,126 @@
++// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
++// vim: ts=8 sw=2 smarttab
++
++#ifndef CEPH_LIBRBD_CACHE_SHARED_PERSISTENT_OBJECT_CACHER_OBJECT_DISPATCH_H
++#define CEPH_LIBRBD_CACHE_SHARED_PERSISTENT_OBJECT_CACHER_OBJECT_DISPATCH_H
++
++#include "common/Mutex.h"
++#include "SharedPersistentObjectCacher.h"
++#include "librbd/io/ObjectDispatchInterface.h"
++#include "tools/ceph_immutable_object_cache/CacheClient.h"
++
++
++namespace librbd {
++
++class ImageCtx;
++
++namespace cache {
++
++template <typename ImageCtxT = ImageCtx>
++class SharedReadOnlyObjectDispatch : public io::ObjectDispatchInterface {
++public:
++ static SharedReadOnlyObjectDispatch* create(ImageCtxT* image_ctx) {
++ return new SharedReadOnlyObjectDispatch(image_ctx);
++ }
++
++ SharedReadOnlyObjectDispatch(ImageCtxT* image_ctx);
++ ~SharedReadOnlyObjectDispatch() override;
++
++ io::ObjectDispatchLayer get_object_dispatch_layer() const override {
++ return io::OBJECT_DISPATCH_LAYER_SHARED_PERSISTENT_CACHE;
++ }
++
++ void init();
++ void shut_down(Context* on_finish) {
++ m_image_ctx->op_work_queue->queue(on_finish, 0);
++ }
++
++ bool read(
++ const std::string &oid, uint64_t object_no, uint64_t object_off,
++ uint64_t object_len, librados::snap_t snap_id, int op_flags,
++ const ZTracer::Trace &parent_trace, ceph::bufferlist* read_data,
++ io::ExtentMap* extent_map, int* object_dispatch_flags,
++ io::DispatchResult* dispatch_result, Context** on_finish,
++ Context* on_dispatched) override;
++
++ bool discard(
++ const std::string &oid, uint64_t object_no, uint64_t object_off,
++ uint64_t object_len, const ::SnapContext &snapc, int discard_flags,
++ const ZTracer::Trace &parent_trace, int* object_dispatch_flags,
++ uint64_t* journal_tid, io::DispatchResult* dispatch_result,
++ Context** on_finish, Context* on_dispatched) {
++ return false;
++ }
++
++ bool write(
++ const std::string &oid, uint64_t object_no, uint64_t object_off,
++ ceph::bufferlist&& data, const ::SnapContext &snapc, int op_flags,
++ const ZTracer::Trace &parent_trace, int* object_dispatch_flags,
++ uint64_t* journal_tid, io::DispatchResult* dispatch_result,
++ Context** on_finish, Context* on_dispatched) {
++ return false;
++ }
++
++ bool write_same(
++ const std::string &oid, uint64_t object_no, uint64_t object_off,
++ uint64_t object_len, io::Extents&& buffer_extents,
++ ceph::bufferlist&& data, const ::SnapContext &snapc, int op_flags,
++ const ZTracer::Trace &parent_trace, int* object_dispatch_flags,
++ uint64_t* journal_tid, io::DispatchResult* dispatch_result,
++ Context** on_finish, Context* on_dispatched) {
++ return false;
++ }
++
++ bool compare_and_write(
++ const std::string &oid, uint64_t object_no, uint64_t object_off,
++ ceph::bufferlist&& cmp_data, ceph::bufferlist&& write_data,
++ const ::SnapContext &snapc, int op_flags,
++ const ZTracer::Trace &parent_trace, uint64_t* mismatch_offset,
++ int* object_dispatch_flags, uint64_t* journal_tid,
++ io::DispatchResult* dispatch_result, Context** on_finish,
++ Context* on_dispatched) {
++ return false;
++ }
++
++ bool flush(
++ io::FlushSource flush_source, const ZTracer::Trace &parent_trace,
++ io::DispatchResult* dispatch_result, Context** on_finish,
++ Context* on_dispatched) {
++ return false;
++ }
++
++ bool invalidate_cache(Context* on_finish) {
++ return false;
++ }
++
++ bool reset_existence_cache(Context* on_finish) {
++ return false;
++ }
++
++ void extent_overwritten(
++ uint64_t object_no, uint64_t object_off, uint64_t object_len,
++ uint64_t journal_tid, uint64_t new_journal_tid) {
++ }
++
++private:
++
++ int handle_read_cache(
++ bool cache,
++ const std::string &oid, uint64_t object_off,
++ uint64_t object_len, ceph::bufferlist* read_data,
++ io::DispatchResult* dispatch_result,
++ Context* on_dispatched);
++ void client_handle_request(std::string msg);
++
++ ImageCtxT* m_image_ctx;
++
++ ceph::immutable_obj_cache::CacheClient *m_cache_client = nullptr;
++ SharedPersistentObjectCacher<ImageCtxT> *m_object_store = nullptr;
++};
++
++} // namespace cache
++} // namespace librbd
++
++extern template class librbd::cache::SharedReadOnlyObjectDispatch<librbd::ImageCtx>;
++
++#endif // CEPH_LIBRBD_CACHE_OBJECT_CACHER_OBJECT_DISPATCH_H
+diff --git a/src/librbd/image/OpenRequest.cc b/src/librbd/image/OpenRequest.cc
+index 30a7b66..57ce92f 100644
+--- a/src/librbd/image/OpenRequest.cc
++++ b/src/librbd/image/OpenRequest.cc
+@@ -8,7 +8,7 @@
+ #include "librbd/ImageCtx.h"
+ #include "librbd/Utils.h"
+ #include "librbd/cache/ObjectCacherObjectDispatch.h"
+-#include "librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc"
++#include "librbd/cache/SharedReadOnlyObjectDispatch.cc"
+ #include "librbd/image/CloseRequest.h"
+ #include "librbd/image/RefreshRequest.h"
+ #include "librbd/image/SetSnapRequest.h"
+@@ -457,7 +457,7 @@ Context *OpenRequest<I>::send_init_cache(int *result) {
+ // enable Shared Read-only cache for parent image
+ if (m_image_ctx->child != nullptr && m_image_ctx->shared_cache_enabled ) {
+ ldout(cct, 10) << this << " " << "setting up parent cache"<< dendl;
+- auto sro_cache = cache::SharedPersistentObjectCacherObjectDispatch<I>::create(m_image_ctx);
++ auto sro_cache = cache::SharedReadOnlyObjectDispatch<I>::create(m_image_ctx);
+ sro_cache->init();
+ }
+
+diff --git a/src/tools/CMakeLists.txt b/src/tools/CMakeLists.txt
+index 72ab342..f7c5872 100644
+--- a/src/tools/CMakeLists.txt
++++ b/src/tools/CMakeLists.txt
+@@ -99,7 +99,6 @@ endif(WITH_CEPHFS)
+ if(WITH_RBD)
+ add_subdirectory(rbd)
+ add_subdirectory(rbd_mirror)
+- add_subdirectory(rbd_cache)
+ if(LINUX)
+ add_subdirectory(rbd_nbd)
+ endif()
+@@ -108,4 +107,5 @@ if(WITH_RBD)
+ endif()
+ endif(WITH_RBD)
+
++add_subdirectory(ceph_immutable_object_cache)
+ add_subdirectory(ceph-dencoder)
+diff --git a/src/tools/ceph_immutable_object_cache/CMakeLists.txt b/src/tools/ceph_immutable_object_cache/CMakeLists.txt
+new file mode 100644
+index 0000000..c7c7af3
+--- /dev/null
++++ b/src/tools/ceph_immutable_object_cache/CMakeLists.txt
+@@ -0,0 +1,11 @@
++add_executable(ceph-immutable-object-cache
++ ${CMAKE_SOURCE_DIR}/src/librbd/cache/SharedPersistentObjectCacherFile.cc
++ ObjectCacheStore.cc
++ CacheController.cc
++ CacheServer.cc
++ CacheSession.cc
++ main.cc)
++target_link_libraries(ceph-immutable-object-cache
++ librados
++ global)
++install(TARGETS ceph-immutable-object-cache DESTINATION bin)
+diff --git a/src/tools/ceph_immutable_object_cache/CacheClient.cc b/src/tools/ceph_immutable_object_cache/CacheClient.cc
+new file mode 100644
+index 0000000..a7116bf
+--- /dev/null
++++ b/src/tools/ceph_immutable_object_cache/CacheClient.cc
+@@ -0,0 +1,205 @@
++// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
++// vim: ts=8 sw=2 smarttab
++
++#include "CacheClient.h"
++
++#define dout_context g_ceph_context
++#define dout_subsys ceph_subsys_immutable_obj_cache
++#undef dout_prefix
++#define dout_prefix *_dout << "ceph::cache::CacheControllerSocketClient: " << this << " " \
++ << __func__ << ": "
++
++
++using boost::asio::local::stream_protocol;
++
++namespace ceph {
++namespace immutable_obj_cache {
++
++ CacheClient::CacheClient(const std::string& file, ClientProcessMsg processmsg, CephContext* ceph_ctx)
++ : m_io_service_work(m_io_service),
++ m_dm_socket(m_io_service),
++ m_client_process_msg(processmsg),
++ m_ep(stream_protocol::endpoint(file)),
++ m_session_work(false),
++ cct(ceph_ctx)
++ {
++ // TODO wrapper io_service
++ std::thread thd([this](){m_io_service.run();});
++ thd.detach();
++ }
++
++ void CacheClient::run(){
++ }
++
++ bool CacheClient::is_session_work() {
++ return m_session_work.load() == true;
++ }
++
++ // just when error occur, call this method.
++ void CacheClient::close() {
++ m_session_work.store(false);
++ boost::system::error_code close_ec;
++ m_dm_socket.close(close_ec);
++ if(close_ec) {
++ ldout(cct, 20) << "close: " << close_ec.message() << dendl;
++ }
++ ldout(cct, 20) << "session don't work, later all request will be dispatched to rados layer" << dendl;
++ }
++
++ int CacheClient::connect() {
++ boost::system::error_code ec;
++ m_dm_socket.connect(m_ep, ec);
++ if(ec) {
++ if(ec == boost::asio::error::connection_refused) {
++ ldout(cct, 20) << ec.message() << " : maybe rbd-cache Controller don't startup. "
++ << "Now data will be read from ceph cluster " << dendl;
++ } else {
++ ldout(cct, 20) << "connect: " << ec.message() << dendl;
++ }
++
++ if(m_dm_socket.is_open()) {
++ // Set to indicate what error occurred, if any.
++ // Note that, even if the function indicates an error,
++ // the underlying descriptor is closed.
++ boost::system::error_code close_ec;
++ m_dm_socket.close(close_ec);
++ if(close_ec) {
++ ldout(cct, 20) << "close: " << close_ec.message() << dendl;
++ }
++ }
++ return -1;
++ }
++
++ ldout(cct, 20) <<"connect success"<< dendl;
++
++ return 0;
++ }
++
++ int CacheClient::register_volume(std::string pool_name, std::string vol_name, uint64_t vol_size) {
++ // cache controller will init layout
++ rbdsc_req_type_t *message = new rbdsc_req_type_t();
++ message->type = RBDSC_REGISTER;
++ memcpy(message->pool_name, pool_name.c_str(), pool_name.size());
++ memcpy(message->vol_name, vol_name.c_str(), vol_name.size());
++ message->vol_size = vol_size;
++ message->offset = 0;
++ message->length = 0;
++
++ uint64_t ret;
++ boost::system::error_code ec;
++
++ ret = boost::asio::write(m_dm_socket, boost::asio::buffer((char*)message, message->size()), ec);
++ if(ec) {
++ ldout(cct, 20) << "write fails : " << ec.message() << dendl;
++ return -1;
++ }
++
++ if(ret != message->size()) {
++ ldout(cct, 20) << "write fails : ret != send_bytes " << dendl;
++ return -1;
++ }
++
++ // hard code TODO
++ ret = boost::asio::read(m_dm_socket, boost::asio::buffer(m_recv_buffer, RBDSC_MSG_LEN), ec);
++ if(ec == boost::asio::error::eof) {
++ ldout(cct, 20) << "recv eof" << dendl;
++ return -1;
++ }
++
++ if(ec) {
++ ldout(cct, 20) << "write fails : " << ec.message() << dendl;
++ return -1;
++ }
++
++ if(ret != RBDSC_MSG_LEN) {
++ ldout(cct, 20) << "write fails : ret != receive bytes " << dendl;
++ return -1;
++ }
++
++ m_client_process_msg(std::string(m_recv_buffer, ret));
++
++ delete message;
++
++ ldout(cct, 20) << "register volume success" << dendl;
++
++ // TODO
++ m_session_work.store(true);
++
++ return 0;
++ }
++
++ // if occur any error, we just return false. Then read from rados.
++ int CacheClient::lookup_object(std::string pool_name, std::string vol_name, std::string object_id, Context* on_finish) {
++ rbdsc_req_type_t *message = new rbdsc_req_type_t();
++ message->type = RBDSC_READ;
++ memcpy(message->pool_name, pool_name.c_str(), pool_name.size());
++ memcpy(message->vol_name, object_id.c_str(), object_id.size());
++ message->vol_size = 0;
++ message->offset = 0;
++ message->length = 0;
++
++ boost::asio::async_write(m_dm_socket,
++ boost::asio::buffer((char*)message, message->size()),
++ boost::asio::transfer_exactly(RBDSC_MSG_LEN),
++ [this, on_finish, message](const boost::system::error_code& err, size_t cb) {
++ delete message;
++ if(err) {
++ ldout(cct, 20) << "lookup_object: async_write fails." << err.message() << dendl;
++ close();
++ on_finish->complete(false);
++ return;
++ }
++ if(cb != RBDSC_MSG_LEN) {
++ ldout(cct, 20) << "lookup_object: async_write fails. in-complete request" << dendl;
++ close();
++ on_finish->complete(false);
++ return;
++ }
++ get_result(on_finish);
++ });
++
++ return 0;
++ }
++
++ void CacheClient::get_result(Context* on_finish) {
++ boost::asio::async_read(m_dm_socket, boost::asio::buffer(m_recv_buffer, RBDSC_MSG_LEN),
++ boost::asio::transfer_exactly(RBDSC_MSG_LEN),
++ [this, on_finish](const boost::system::error_code& err, size_t cb) {
++ if(err == boost::asio::error::eof) {
++ ldout(cct, 20) << "get_result: ack is EOF." << dendl;
++ close();
++ on_finish->complete(false);
++ return;
++ }
++ if(err) {
++ ldout(cct, 20) << "get_result: async_read fails:" << err.message() << dendl;
++ close();
++ on_finish->complete(false); // TODO replace this assert with some metohds.
++ return;
++ }
++ if (cb != RBDSC_MSG_LEN) {
++ close();
++ ldout(cct, 20) << "get_result: in-complete ack." << dendl;
++ on_finish->complete(false); // TODO: replace this assert with some methods.
++ }
++
++ rbdsc_req_type_t *io_ctx = (rbdsc_req_type_t*)(m_recv_buffer);
++
++ // TODO: re-occur yuan's bug
++ if(io_ctx->type == RBDSC_READ) {
++ ldout(cct, 20) << "get rbdsc_read... " << dendl;
++ assert(0);
++ }
++
++ if (io_ctx->type == RBDSC_READ_REPLY) {
++ on_finish->complete(true);
++ return;
++ } else {
++ on_finish->complete(false);
++ return;
++ }
++ });
++ }
++
++} // namespace immutable_obj_cache
++} // namespace ceph
+diff --git a/src/tools/ceph_immutable_object_cache/CacheClient.h b/src/tools/ceph_immutable_object_cache/CacheClient.h
+new file mode 100644
+index 0000000..d82ab8f
+--- /dev/null
++++ b/src/tools/ceph_immutable_object_cache/CacheClient.h
+@@ -0,0 +1,53 @@
++// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
++// vim: ts=8 sw=2 smarttab
++
++#ifndef CEPH_CACHE_CLIENT_H
++#define CEPH_CACHE_CLIENT_H
++
++#include <atomic>
++#include <boost/asio.hpp>
++#include <boost/bind.hpp>
++#include <boost/asio/error.hpp>
++#include <boost/algorithm/string.hpp>
++#include "librbd/ImageCtx.h"
++#include "include/assert.h"
++#include "include/Context.h"
++#include "SocketCommon.h"
++
++
++using boost::asio::local::stream_protocol;
++
++namespace ceph {
++namespace immutable_obj_cache {
++
++class CacheClient {
++public:
++ CacheClient(const std::string& file, ClientProcessMsg processmsg, CephContext* ceph_ctx);
++ void run();
++ bool is_session_work();
++
++ void close();
++ int connect();
++
++ int register_volume(std::string pool_name, std::string vol_name, uint64_t vol_size);
++ int lookup_object(std::string pool_name, std::string vol_name, std::string object_id, Context* on_finish);
++ void get_result(Context* on_finish);
++
++private:
++ boost::asio::io_service m_io_service;
++ boost::asio::io_service::work m_io_service_work;
++ stream_protocol::socket m_dm_socket;
++ ClientProcessMsg m_client_process_msg;
++ stream_protocol::endpoint m_ep;
++ char m_recv_buffer[1024];
++
++ // atomic modfiy for this variable.
++ // thread 1 : asio callback thread modify it.
++ // thread 2 : librbd read it.
++ std::atomic<bool> m_session_work;
++ CephContext* cct;
++};
++
++} // namespace immutable_obj_cache
++} // namespace ceph
++#endif
+diff --git a/src/tools/ceph_immutable_object_cache/CacheController.cc b/src/tools/ceph_immutable_object_cache/CacheController.cc
+new file mode 100644
+index 0000000..cb636d2
+--- /dev/null
++++ b/src/tools/ceph_immutable_object_cache/CacheController.cc
+@@ -0,0 +1,117 @@
++// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
++// vim: ts=8 sw=2 smarttab
++
++#include "CacheController.h"
++
++#define dout_context g_ceph_context
++#define dout_subsys ceph_subsys_immutable_obj_cache
++#undef dout_prefix
++#define dout_prefix *_dout << "ceph::cache::CacheController: " << this << " " \
++ << __func__ << ": "
++
++namespace ceph {
++namespace immutable_obj_cache {
++
++class ThreadPoolSingleton : public ThreadPool {
++public:
++ ContextWQ *op_work_queue;
++
++ explicit ThreadPoolSingleton(CephContext *cct)
++ : ThreadPool(cct, "ceph::cache::thread_pool", "tp_librbd_cache", 32,
++ "pcache_threads"),
++ op_work_queue(new ContextWQ("ceph::pcache_op_work_queue",
++ cct->_conf.get_val<int64_t>("rbd_op_thread_timeout"),
++ this)) {
++ start();
++ }
++ ~ThreadPoolSingleton() override {
++ op_work_queue->drain();
++ delete op_work_queue;
++
++ stop();
++ }
++};
++
++
++CacheController::CacheController(CephContext *cct, const std::vector<const char*> &args):
++ m_args(args), m_cct(cct) {
++
++}
++
++CacheController::~CacheController() {
++
++}
++
++int CacheController::init() {
++ ThreadPoolSingleton* thread_pool_singleton = &m_cct->lookup_or_create_singleton_object<ThreadPoolSingleton>(
++ "ceph::cache::thread_pool", false, m_cct);
++ pcache_op_work_queue = thread_pool_singleton->op_work_queue;
++
++ m_object_cache_store = new ObjectCacheStore(m_cct, pcache_op_work_queue);
++ //TODO(): make this configurable
++ int r = m_object_cache_store->init(true);
++ if (r < 0) {
++ lderr(m_cct) << "init error\n" << dendl;
++ }
++ return r;
++}
++
++int CacheController::shutdown() {
++ int r = m_object_cache_store->shutdown();
++ return r;
++}
++
++void CacheController::handle_signal(int signum){}
++
++void CacheController::run() {
++ try {
++ //TODO(): use new socket path
++ std::string controller_path = m_cct->_conf.get_val<std::string>("rbd_shared_cache_sock");
++ std::remove(controller_path.c_str());
++
++ m_cache_server = new CacheServer(controller_path,
++ ([&](uint64_t p, std::string s){handle_request(p, s);}), m_cct);
++ m_cache_server->run();
++ } catch (std::exception& e) {
++ lderr(m_cct) << "Exception: " << e.what() << dendl;
++ }
++}
++
++void CacheController::handle_request(uint64_t session_id, std::string msg){
++ rbdsc_req_type_t *io_ctx = (rbdsc_req_type_t*)(msg.c_str());
++
++ int ret = 0;
++
++ switch (io_ctx->type) {
++ case RBDSC_REGISTER: {
++ // init cache layout for volume
++ m_object_cache_store->init_cache(io_ctx->vol_name, io_ctx->vol_size);
++ io_ctx->type = RBDSC_REGISTER_REPLY;
++ m_cache_server->send(session_id, std::string((char*)io_ctx, msg.size()));
++
++ break;
++ }
++ case RBDSC_READ: {
++ // lookup object in local cache store
++ ret = m_object_cache_store->lookup_object(io_ctx->pool_name, io_ctx->vol_name);
++ if (ret < 0) {
++ io_ctx->type = RBDSC_READ_RADOS;
++ } else {
++ io_ctx->type = RBDSC_READ_REPLY;
++ }
++ if (io_ctx->type != RBDSC_READ_REPLY) {
++ assert(0);
++ }
++ m_cache_server->send(session_id, std::string((char*)io_ctx, msg.size()));
++
++ break;
++ }
++ ldout(m_cct, 5) << "can't recongize request" << dendl;
++ assert(0); // TODO replace it.
++ }
++}
++
++} // namespace immutable_obj_cache
++} // namespace ceph
++
++
+diff --git a/src/tools/ceph_immutable_object_cache/CacheController.h b/src/tools/ceph_immutable_object_cache/CacheController.h
+new file mode 100644
+index 0000000..837fe36
+--- /dev/null
++++ b/src/tools/ceph_immutable_object_cache/CacheController.h
+@@ -0,0 +1,53 @@
++// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
++// vim: ts=8 sw=2 smarttab
++
++#ifndef CEPH_CACHE_CONTROLLER_H
++#define CEPH_CACHE_CONTROLLER_H
++
++#include "common/Formatter.h"
++#include "common/admin_socket.h"
++#include "common/debug.h"
++#include "common/errno.h"
++#include "common/ceph_context.h"
++#include "common/Mutex.h"
++#include "common/WorkQueue.h"
++#include "include/rados/librados.hpp"
++#include "include/rbd/librbd.h"
++#include "include/assert.h"
++#include "librbd/ImageCtx.h"
++#include "librbd/ImageState.h"
++#include "CacheServer.h"
++#include "ObjectCacheStore.h"
++
++#include <thread>
++
++namespace ceph {
++namespace immutable_obj_cache {
++
++class CacheController {
++ public:
++ CacheController(CephContext *cct, const std::vector<const char*> &args);
++ ~CacheController();
++
++ int init();
++
++ int shutdown();
++
++ void handle_signal(int sinnum);
++
++ void run();
++
++ void handle_request(uint64_t sesstion_id, std::string msg);
++
++ private:
++ CacheServer *m_cache_server;
++ std::vector<const char*> m_args;
++ CephContext *m_cct;
++ ObjectCacheStore *m_object_cache_store;
++ ContextWQ* pcache_op_work_queue;
++};
++
++} // namespace immutable_obj_cache
++} // namespace ceph
++
++#endif
+diff --git a/src/tools/ceph_immutable_object_cache/CacheServer.cc b/src/tools/ceph_immutable_object_cache/CacheServer.cc
+new file mode 100644
+index 0000000..dd2d47e
+--- /dev/null
++++ b/src/tools/ceph_immutable_object_cache/CacheServer.cc
+@@ -0,0 +1,99 @@
++// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
++// vim: ts=8 sw=2 smarttab
++
++#include "common/debug.h"
++#include "common/ceph_context.h"
++#include "CacheServer.h"
++
++#define dout_context g_ceph_context
++#define dout_subsys ceph_subsys_immutable_obj_cache
++#undef dout_prefix
++#define dout_prefix *_dout << "ceph::cache::CacheControllerSocket: " << this << " " \
++ << __func__ << ": "
++
++
++using boost::asio::local::stream_protocol;
++
++namespace ceph {
++namespace immutable_obj_cache {
++
++CacheServer::CacheServer(const std::string& file, ProcessMsg processmsg, CephContext* cct)
++ : cct(cct), m_server_process_msg(processmsg),
++ m_local_path(file), m_acceptor(m_io_service) {}
++
++CacheServer::~CacheServer(){}
++
++void CacheServer::run() {
++ bool ret;
++ ret = start_accept();
++ if(!ret) {
++ return;
++ }
++ m_io_service.run();
++}
++
++// TODO : use callback to replace this function.
++void CacheServer::send(uint64_t session_id, std::string msg) {
++ auto it = m_session_map.find(session_id);
++ if (it != m_session_map.end()) {
++ it->second->send(msg);
++ } else {
++ // TODO : why don't find existing session id ?
++ ldout(cct, 20) << "don't find session id..." << dendl;
++ assert(0);
++ }
++}
++
++// when creating one acceptor, can control every step in this way.
++bool CacheServer::start_accept() {
++ boost::system::error_code ec;
++ m_acceptor.open(m_local_path.protocol(), ec);
++ if(ec) {
++ ldout(cct, 20) << "m_acceptor open fails: " << ec.message() << dendl;
++ return false;
++ }
++
++ // TODO control acceptor attribute.
++
++ m_acceptor.bind(m_local_path, ec);
++ if(ec) {
++ ldout(cct, 20) << "m_acceptor bind fails: " << ec.message() << dendl;
++ return false;
++ }
++
++ m_acceptor.listen(boost::asio::socket_base::max_connections, ec);
++ if(ec) {
++ ldout(cct, 20) << "m_acceptor listen fails: " << ec.message() << dendl;
++ return false;
++ }
++
++ accept();
++ return true;
++}
++
++void CacheServer::accept() {
++ CacheSessionPtr new_session(new CacheSession(m_session_id, m_io_service, m_server_process_msg, cct));
++ m_acceptor.async_accept(new_session->socket(),
++ boost::bind(&CacheServer::handle_accept, this, new_session,
++ boost::asio::placeholders::error));
++}
++
++void CacheServer::handle_accept(CacheSessionPtr new_session, const boost::system::error_code& error) {
++
++ if(error) {
++ lderr(cct) << "async accept fails : " << error.message() << dendl;
++ assert(0); // TODO
++ }
++
++ m_session_map.emplace(m_session_id, new_session);
++ // TODO : session setting
++ new_session->start();
++ m_session_id++;
++
++ // lanuch next accept
++ accept();
++}
++
++} // namespace immutable_obj_cache
++} // namespace ceph
++
+diff --git a/src/tools/ceph_immutable_object_cache/CacheServer.h b/src/tools/ceph_immutable_object_cache/CacheServer.h
+new file mode 100644
+index 0000000..6c5c133
+--- /dev/null
++++ b/src/tools/ceph_immutable_object_cache/CacheServer.h
+@@ -0,0 +1,54 @@
++// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
++// vim: ts=8 sw=2 smarttab
++
++#ifndef CEPH_CACHE_SERVER_H
++#define CEPH_CACHE_SERVER_H
++
++#include <cstdio>
++#include <iostream>
++#include <array>
++#include <memory>
++#include <string>
++#include <boost/bind.hpp>
++#include <boost/asio.hpp>
++#include <boost/asio/error.hpp>
++#include <boost/algorithm/string.hpp>
++
++#include "include/assert.h"
++#include "SocketCommon.h"
++#include "CacheSession.h"
++
++
++using boost::asio::local::stream_protocol;
++
++namespace ceph {
++namespace immutable_obj_cache {
++
++class CacheServer {
++
++ public:
++ CacheServer(const std::string& file, ProcessMsg processmsg, CephContext* cct);
++ ~CacheServer();
++
++ void run();
++ void send(uint64_t session_id, std::string msg);
++
++ private:
++ bool start_accept();
++ void accept();
++ void handle_accept(CacheSessionPtr new_session, const boost::system::error_code& error);
++
++ private:
++ CephContext* cct;
++ boost::asio::io_service m_io_service; // TODO wrapper it.
++ ProcessMsg m_server_process_msg;
++ stream_protocol::endpoint m_local_path;
++ stream_protocol::acceptor m_acceptor;
++ uint64_t m_session_id = 1;
++ std::map<uint64_t, CacheSessionPtr> m_session_map;
++};
++
++} // namespace immutable_obj_cache
++} // namespace ceph
++
++#endif
+diff --git a/src/tools/ceph_immutable_object_cache/CacheSession.cc b/src/tools/ceph_immutable_object_cache/CacheSession.cc
+new file mode 100644
+index 0000000..6cffb41
+--- /dev/null
++++ b/src/tools/ceph_immutable_object_cache/CacheSession.cc
+@@ -0,0 +1,115 @@
++// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
++// vim: ts=8 sw=2 smarttab
++
++#include "common/debug.h"
++#include "common/ceph_context.h"
++#include "CacheSession.h"
++
++#define dout_context g_ceph_context
++#define dout_subsys ceph_subsys_immutable_obj_cache
++#undef dout_prefix
++#define dout_prefix *_dout << "ceph::cache::CacheSession: " << this << " " \
++ << __func__ << ": "
++
++
++namespace ceph {
++namespace immutable_obj_cache {
++
++CacheSession::CacheSession(uint64_t session_id, boost::asio::io_service& io_service, ProcessMsg processmsg, CephContext* cct)
++ : m_session_id(session_id), m_dm_socket(io_service), process_msg(processmsg), cct(cct)
++ {}
++
++CacheSession::~CacheSession(){}
++
++stream_protocol::socket& CacheSession::socket() {
++ return m_dm_socket;
++}
++
++void CacheSession::start() {
++ if(true) {
++ serial_handing_request();
++ } else {
++ parallel_handing_request();
++ }
++}
++// flow:
++//
++// recv request --> process request --> reply ack
++// | |
++// --------------<-------------------------
++void CacheSession::serial_handing_request() {
++ boost::asio::async_read(m_dm_socket, boost::asio::buffer(m_buffer, RBDSC_MSG_LEN),
++ boost::asio::transfer_exactly(RBDSC_MSG_LEN),
++ boost::bind(&CacheSession::handle_read,
++ shared_from_this(),
++ boost::asio::placeholders::error,
++ boost::asio::placeholders::bytes_transferred));
++}
++
++// flow :
++//
++// --> thread 1: process request
++// recv request --> thread 2: process request --> reply ack
++// --> thread n: process request
++//
++void CacheSession::parallel_handing_request() {
++ // TODO
++}
++
++void CacheSession::handle_read(const boost::system::error_code& error, size_t bytes_transferred) {
++ // when recv eof, the most proble is that client side close socket.
++ // so, server side need to end handing_request
++ if(error == boost::asio::error::eof) {
++ ldout(cct, 20) << "session: async_read : " << error.message() << dendl;
++ return;
++ }
++
++ if(error) {
++ ldout(cct, 20) << "session: async_read fails: " << error.message() << dendl;
++ assert(0);
++ }
++
++ if(bytes_transferred != RBDSC_MSG_LEN) {
++ ldout(cct, 20) << "session : request in-complete. "<<dendl;
++ assert(0);
++ }
++
++ // TODO async_process can increse coding readable.
++ // process_msg_callback call handle async_send
++ process_msg(m_session_id, std::string(m_buffer, bytes_transferred));
++}
++
++void CacheSession::handle_write(const boost::system::error_code& error, size_t bytes_transferred) {
++ if (error) {
++ ldout(cct, 20) << "session: async_write fails: " << error.message() << dendl;
++ assert(0);
++ }
++
++ if(bytes_transferred != RBDSC_MSG_LEN) {
++ ldout(cct, 20) << "session : reply in-complete. "<<dendl;
++ assert(0);
++ }
++
++ boost::asio::async_read(m_dm_socket, boost::asio::buffer(m_buffer),
++ boost::asio::transfer_exactly(RBDSC_MSG_LEN),
++ boost::bind(&CacheSession::handle_read,
++ shared_from_this(),
++ boost::asio::placeholders::error,
++ boost::asio::placeholders::bytes_transferred));
++
++}
++
++void CacheSession::send(std::string msg) {
++ boost::asio::async_write(m_dm_socket,
++ boost::asio::buffer(msg.c_str(), msg.size()),
++ boost::asio::transfer_exactly(RBDSC_MSG_LEN),
++ boost::bind(&CacheSession::handle_write,
++ shared_from_this(),
++ boost::asio::placeholders::error,
++ boost::asio::placeholders::bytes_transferred));
++
++}
++
++} // namespace immutable_obj_cache
++} // namespace ceph
++
+diff --git a/src/tools/ceph_immutable_object_cache/CacheSession.h b/src/tools/ceph_immutable_object_cache/CacheSession.h
+new file mode 100644
+index 0000000..ce2591b
+--- /dev/null
++++ b/src/tools/ceph_immutable_object_cache/CacheSession.h
+@@ -0,0 +1,58 @@
++// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
++// vim: ts=8 sw=2 smarttab
++
++#ifndef CEPH_CACHE_SESSION_H
++#define CEPH_CACHE_SESSION_H
++
++#include <iostream>
++#include <string>
++#include <boost/bind.hpp>
++#include <boost/asio.hpp>
++#include <boost/asio/error.hpp>
++#include <boost/algorithm/string.hpp>
++
++#include "include/assert.h"
++#include "SocketCommon.h"
++
++
++using boost::asio::local::stream_protocol;
++
++namespace ceph {
++namespace immutable_obj_cache {
++
++class CacheSession : public std::enable_shared_from_this<CacheSession> {
++public:
++ CacheSession(uint64_t session_id, boost::asio::io_service& io_service, ProcessMsg processmsg, CephContext* cct);
++ ~CacheSession();
++
++ stream_protocol::socket& socket();
++ void start();
++ void serial_handing_request();
++ void parallel_handing_request();
++
++private:
++
++ void handle_read(const boost::system::error_code& error, size_t bytes_transferred);
++
++ void handle_write(const boost::system::error_code& error, size_t bytes_transferred);
++
++public:
++ void send(std::string msg);
++
++private:
++ uint64_t m_session_id;
++ stream_protocol::socket m_dm_socket;
++ ProcessMsg process_msg;
++ CephContext* cct;
++
++ // Buffer used to store data received from the client.
++ //std::array<char, 1024> data_;
++ char m_buffer[1024];
++};
++
++typedef std::shared_ptr<CacheSession> CacheSessionPtr;
++
++} // namespace immutable_obj_cache
++} // namespace ceph
++
++#endif
+diff --git a/src/tools/ceph_immutable_object_cache/ObjectCacheStore.cc b/src/tools/ceph_immutable_object_cache/ObjectCacheStore.cc
+new file mode 100644
+index 0000000..50721ca
+--- /dev/null
++++ b/src/tools/ceph_immutable_object_cache/ObjectCacheStore.cc
+@@ -0,0 +1,172 @@
++// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
++// vim: ts=8 sw=2 smarttab
++
++#include "ObjectCacheStore.h"
++
++#define dout_context g_ceph_context
++#define dout_subsys ceph_subsys_immutable_obj_cache
++#undef dout_prefix
++#define dout_prefix *_dout << "ceph::cache::ObjectCacheStore: " << this << " " \
++ << __func__ << ": "
++
++namespace ceph {
++namespace immutable_obj_cache {
++
++ObjectCacheStore::ObjectCacheStore(CephContext *cct, ContextWQ* work_queue)
++ : m_cct(cct), m_work_queue(work_queue),
++ m_rados(new librados::Rados()) {
++
++ uint64_t object_cache_entries =
++ cct->_conf.get_val<int64_t>("rbd_shared_cache_entries");
++
++ //TODO(): allow to set level
++ m_policy = new SimplePolicy(object_cache_entries, 0.5);
++}
++
++ObjectCacheStore::~ObjectCacheStore() {
++ delete m_policy;
++}
++
++int ObjectCacheStore::init(bool reset) {
++
++ int ret = m_rados->init_with_context(m_cct);
++ if(ret < 0) {
++ lderr(m_cct) << "fail to init Ceph context" << dendl;
++ return ret;
++ }
++
++ ret = m_rados->connect();
++ if(ret < 0 ) {
++ lderr(m_cct) << "fail to conect to cluster" << dendl;
++ return ret;
++ }
++
++ std::string cache_path = m_cct->_conf.get_val<std::string>("rbd_shared_cache_path");
++ //TODO(): check and reuse existing cache objects
++ if(reset) {
++ std::string cmd = "exec rm -rf " + cache_path + "/rbd_cache*; exec mkdir -p " + cache_path;
++ //TODO(): to use std::filesystem
++ int r = system(cmd.c_str());
++ }
++
++ evict_thd = new std::thread([this]{this->evict_thread_body();});
++ return ret;
++}
++
++int ObjectCacheStore::do_promote(std::string pool_name, std::string object_name) {
++ int ret = 0;
++ std::string cache_file_name = pool_name + object_name;
++
++ //TODO(): lock on ioctx map
++ if (m_ioctxs.find(pool_name) == m_ioctxs.end()) {
++ librados::IoCtx* io_ctx = new librados::IoCtx();
++ ret = m_rados->ioctx_create(pool_name.c_str(), *io_ctx);
++ if (ret < 0) {
++ lderr(m_cct) << "fail to create ioctx" << dendl;
++ assert(0);
++ }
++ m_ioctxs.emplace(pool_name, io_ctx);
++ }
++
++ assert(m_ioctxs.find(pool_name) != m_ioctxs.end());
++
++ librados::IoCtx* ioctx = m_ioctxs[pool_name];
++
++ librados::bufferlist* read_buf = new librados::bufferlist();
++ int object_size = 4096*1024; //TODO(): read config from image metadata
++
++ //TODO(): async promote
++ ret = promote_object(ioctx, object_name, read_buf, object_size);
++ if (ret == -ENOENT) {
++ read_buf->append(std::string(object_size, '0'));
++ ret = 0;
++ }
++
++ if( ret < 0) {
++ lderr(m_cct) << "fail to read from rados" << dendl;
++ return ret;
++ }
++
++ // persistent to cache
++ librbd::cache::SyncFile cache_file(m_cct, cache_file_name);
++ cache_file.open();
++ ret = cache_file.write_object_to_file(*read_buf, object_size);
++
++ // update metadata
++ assert(OBJ_CACHE_PROMOTING == m_policy->get_status(cache_file_name));
++ m_policy->update_status(cache_file_name, OBJ_CACHE_PROMOTED);
++ assert(OBJ_CACHE_PROMOTED == m_policy->get_status(cache_file_name));
++
++ return ret;
++
++}
++
++// return -1, client need to read data from cluster.
++// return 0, client directly read data from cache.
++int ObjectCacheStore::lookup_object(std::string pool_name, std::string object_name) {
++
++ std::string cache_file_name = pool_name + object_name;
++
++ CACHESTATUS ret;
++ ret = m_policy->lookup_object(cache_file_name);
++
++ switch(ret) {
++ case OBJ_CACHE_NONE:
++ return do_promote(pool_name, object_name);
++ case OBJ_CACHE_PROMOTED:
++ return 0;
++ case OBJ_CACHE_PROMOTING:
++ default:
++ return -1;
++ }
++}
++
++void ObjectCacheStore::evict_thread_body() {
++ int ret;
++ while(m_evict_go) {
++ ret = evict_objects();
++ }
++}
++
++
++int ObjectCacheStore::shutdown() {
++ m_evict_go = false;
++ evict_thd->join();
++ m_rados->shutdown();
++ return 0;
++}
++
++int ObjectCacheStore::init_cache(std::string vol_name, uint64_t vol_size) {
++ return 0;
++}
++
++int ObjectCacheStore::lock_cache(std::string vol_name) {
++ return 0;
++}
++
++int ObjectCacheStore::promote_object(librados::IoCtx* ioctx, std::string object_name, librados::bufferlist* read_buf, uint64_t read_len) {
++ int ret;
++
++ librados::AioCompletion* read_completion = librados::Rados::aio_create_completion();
++
++ ret = ioctx->aio_read(object_name, read_completion, read_buf, read_len, 0);
++ if(ret < 0) {
++ lderr(m_cct) << "fail to read from rados" << dendl;
++ return ret;
++ }
++ read_completion->wait_for_complete();
++ ret = read_completion->get_return_value();
++ return ret;
++
++}
++
++int ObjectCacheStore::evict_objects() {
++ std::list<std::string> obj_list;
++ m_policy->get_evict_list(&obj_list);
++ for (auto& obj: obj_list) {
++ //do_evict(obj);
++ }
++}
++
++} // namespace immutable_obj_cache
++} // namespace ceph
+diff --git a/src/tools/ceph_immutable_object_cache/ObjectCacheStore.h b/src/tools/ceph_immutable_object_cache/ObjectCacheStore.h
+new file mode 100644
+index 0000000..d044b27
+--- /dev/null
++++ b/src/tools/ceph_immutable_object_cache/ObjectCacheStore.h
+@@ -0,0 +1,70 @@
++// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
++// vim: ts=8 sw=2 smarttab
++
++#ifndef CEPH_CACHE_OBJECT_CACHE_STORE_H
++#define CEPH_CACHE_OBJECT_CACHE_STORE_H
++
++#include "common/debug.h"
++#include "common/errno.h"
++#include "common/ceph_context.h"
++#include "common/Mutex.h"
++#include "include/rados/librados.hpp"
++#include "include/rbd/librbd.h"
++#include "librbd/ImageCtx.h"
++#include "librbd/ImageState.h"
++#include "librbd/cache/SharedPersistentObjectCacherFile.h"
++#include "SimplePolicy.hpp"
++
++
++using librados::Rados;
++using librados::IoCtx;
++
++namespace ceph {
++namespace immutable_obj_cache {
++
++typedef shared_ptr<librados::Rados> RadosRef;
++typedef shared_ptr<librados::IoCtx> IoCtxRef;
++
++class ObjectCacheStore
++{
++ public:
++ ObjectCacheStore(CephContext *cct, ContextWQ* work_queue);
++ ~ObjectCacheStore();
++
++ int init(bool reset);
++
++ int shutdown();
++
++ int lookup_object(std::string pool_name, std::string object_name);
++
++ int init_cache(std::string vol_name, uint64_t vol_size);
++
++ int lock_cache(std::string vol_name);
++
++ private:
++ void evict_thread_body();
++ int evict_objects();
++
++ int do_promote(std::string pool_name, std::string object_name);
++
++ int promote_object(librados::IoCtx*, std::string object_name,
++ librados::bufferlist* read_buf,
++ uint64_t length);
++
++ CephContext *m_cct;
++ ContextWQ* m_work_queue;
++ RadosRef m_rados;
++
++
++ std::map<std::string, librados::IoCtx*> m_ioctxs;
++
++ librbd::cache::SyncFile *m_cache_file;
++
++ Policy* m_policy;
++ std::thread* evict_thd;
++ bool m_evict_go = false;
++};
++
++} // namespace ceph
++} // namespace immutable_obj_cache
++#endif
+diff --git a/src/tools/ceph_immutable_object_cache/Policy.hpp b/src/tools/ceph_immutable_object_cache/Policy.hpp
+new file mode 100644
+index 0000000..8090202
+--- /dev/null
++++ b/src/tools/ceph_immutable_object_cache/Policy.hpp
+@@ -0,0 +1,33 @@
++// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
++// vim: ts=8 sw=2 smarttab
++
++#ifndef CEPH_CACHE_POLICY_HPP
++#define CEPH_CACHE_POLICY_HPP
++
++#include <list>
++#include <string>
++
++namespace ceph {
++namespace immutable_obj_cache {
++
++enum CACHESTATUS {
++ OBJ_CACHE_NONE = 0,
++ OBJ_CACHE_PROMOTING,
++ OBJ_CACHE_PROMOTED,
++};
++
++
++class Policy {
++public:
++ Policy(){}
++ virtual ~Policy(){};
++ virtual CACHESTATUS lookup_object(std::string) = 0;
++ virtual int evict_object(std::string&) = 0;
++ virtual void update_status(std::string, CACHESTATUS) = 0;
++ virtual CACHESTATUS get_status(std::string) = 0;
++ virtual void get_evict_list(std::list<std::string>* obj_list) = 0;
++};
++
++} // namespace immutable_obj_cache
++} // namespace ceph
++#endif
+diff --git a/src/tools/ceph_immutable_object_cache/SimplePolicy.hpp b/src/tools/ceph_immutable_object_cache/SimplePolicy.hpp
+new file mode 100644
+index 0000000..757ee6a
+--- /dev/null
++++ b/src/tools/ceph_immutable_object_cache/SimplePolicy.hpp
+@@ -0,0 +1,163 @@
++// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
++// vim: ts=8 sw=2 smarttab
++
++#ifndef CEPH_CACHE_SIMPLE_POLICY_HPP
++#define CEPH_CACHE_SIMPLE_POLICY_HPP
++
++#include "Policy.hpp"
++#include "include/lru.h"
++#include "common/RWLock.h"
++#include "common/Mutex.h"
++
++#include <vector>
++#include <unordered_map>
++#include <string>
++
++namespace ceph {
++namespace immutable_obj_cache {
++
++
++class SimplePolicy : public Policy {
++public:
++ SimplePolicy(uint64_t block_num, float watermark)
++ : m_watermark(watermark), m_entry_count(block_num),
++ m_cache_map_lock("rbd::cache::SimplePolicy::m_cache_map_lock"),
++ m_free_list_lock("rbd::cache::SimplePolicy::m_free_list_lock")
++ {
++
++ for(uint64_t i = 0; i < m_entry_count; i++) {
++ m_free_list.push_back(new Entry());
++ }
++
++ }
++
++ ~SimplePolicy() {
++ for(uint64_t i = 0; i < m_entry_count; i++) {
++ Entry* entry = reinterpret_cast<Entry*>(m_free_list.front());
++ delete entry;
++ m_free_list.pop_front();
++ }
++ }
++
++ CACHESTATUS lookup_object(std::string cache_file_name) {
++
++ //TODO(): check race condition
++ RWLock::WLocker wlocker(m_cache_map_lock);
++
++ auto entry_it = m_cache_map.find(cache_file_name);
++ if(entry_it == m_cache_map.end()) {
++ Mutex::Locker locker(m_free_list_lock);
++ Entry* entry = reinterpret_cast<Entry*>(m_free_list.front());
++ assert(entry != nullptr);
++ m_free_list.pop_front();
++ entry->status = OBJ_CACHE_PROMOTING;
++
++ m_cache_map[cache_file_name] = entry;
++
++ return OBJ_CACHE_NONE;
++ }
++
++ Entry* entry = entry_it->second;
++
++ if(entry->status == OBJ_CACHE_PROMOTED) {
++ // touch it
++ m_promoted_lru.lru_touch(entry);
++ }
++
++ return entry->status;
++ }
++
++ int evict_object(std::string& out_cache_file_name) {
++ RWLock::WLocker locker(m_cache_map_lock);
++
++ return 1;
++ }
++
++ // TODO(): simplify the logic
++ void update_status(std::string file_name, CACHESTATUS new_status) {
++ RWLock::WLocker locker(m_cache_map_lock);
++
++ Entry* entry;
++ auto entry_it = m_cache_map.find(file_name);
++
++ // just check.
++ if(new_status == OBJ_CACHE_PROMOTING) {
++ assert(entry_it == m_cache_map.end());
++ }
++
++ assert(entry_it != m_cache_map.end());
++
++ entry = entry_it->second;
++
++ // promoting is done, so update it.
++ if(entry->status == OBJ_CACHE_PROMOTING && new_status== OBJ_CACHE_PROMOTED) {
++ m_promoted_lru.lru_insert_top(entry);
++ entry->status = new_status;
++ return;
++ }
++
++ assert(0);
++ }
++
++ // get entry status
++ CACHESTATUS get_status(std::string file_name) {
++ RWLock::RLocker locker(m_cache_map_lock);
++ auto entry_it = m_cache_map.find(file_name);
++ if(entry_it == m_cache_map.end()) {
++ return OBJ_CACHE_NONE;
++ }
++
++ return entry_it->second->status;
++ }
++
++ void get_evict_list(std::list<std::string>* obj_list) {
++ RWLock::WLocker locker(m_cache_map_lock);
++ // check free ratio, pop entries from LRU
++ if (m_free_list.size() / m_entry_count < m_watermark) {
++ int evict_num = 10; //TODO(): make this configurable
++ for(int i = 0; i < evict_num; i++) {
++ Entry* entry = reinterpret_cast<Entry*>(m_promoted_lru.lru_expire());
++ if (entry == nullptr) {
++ continue;
++ }
++ std::string file_name = entry->cache_file_name;
++ obj_list->push_back(file_name);
++
++ auto entry_it = m_cache_map.find(file_name);
++ m_cache_map.erase(entry_it);
++
++ //mark this entry as free
++ entry->status = OBJ_CACHE_NONE;
++ Mutex::Locker locker(m_free_list_lock);
++ m_free_list.push_back(entry);
++ }
++ }
++ }
++
++private:
++
++ class Entry : public LRUObject {
++ public:
++ CACHESTATUS status;
++ Entry() : status(OBJ_CACHE_NONE){}
++ std::string cache_file_name;
++ void encode(bufferlist &bl){}
++ void decode(bufferlist::iterator &it){}
++ };
++
++ float m_watermark;
++ uint64_t m_entry_count;
++
++ std::unordered_map<std::string, Entry*> m_cache_map;
++ RWLock m_cache_map_lock;
++
++ std::deque<Entry*> m_free_list;
++ Mutex m_free_list_lock;
++
++ LRU m_promoted_lru; // include promoted, using status.
++
++};
++
++} // namespace immutable_obj_cache
++} // namespace ceph
++#endif
+diff --git a/src/tools/ceph_immutable_object_cache/SocketCommon.h b/src/tools/ceph_immutable_object_cache/SocketCommon.h
+new file mode 100644
+index 0000000..53dca54
+--- /dev/null
++++ b/src/tools/ceph_immutable_object_cache/SocketCommon.h
+@@ -0,0 +1,54 @@
++// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
++// vim: ts=8 sw=2 smarttab
++
++#ifndef CEPH_CACHE_SOCKET_COMMON_H
++#define CEPH_CACHE_SOCKET_COMMON_H
++
++namespace ceph {
++namespace immutable_obj_cache {
++
++static const int RBDSC_REGISTER = 0X11;
++static const int RBDSC_READ = 0X12;
++static const int RBDSC_LOOKUP = 0X13;
++static const int RBDSC_REGISTER_REPLY = 0X14;
++static const int RBDSC_READ_REPLY = 0X15;
++static const int RBDSC_LOOKUP_REPLY = 0X16;
++static const int RBDSC_READ_RADOS = 0X17;
++
++
++
++typedef std::function<void(uint64_t, std::string)> ProcessMsg;
++typedef std::function<void(std::string)> ClientProcessMsg;
++typedef uint8_t rbdsc_req_type;
++
++//TODO(): switch to bufferlist
++struct rbdsc_req_type_t {
++ rbdsc_req_type type;
++ uint64_t vol_size;
++ uint64_t offset;
++ uint64_t length;
++ char pool_name[256];
++ char vol_name[256];
++
++ uint64_t size() {
++ return sizeof(rbdsc_req_type_t);
++ }
++
++ std::string to_buffer() {
++ std::stringstream ss;
++ ss << type;
++ ss << vol_size;
++ ss << offset;
++ ss << length;
++ ss << pool_name;
++ ss << vol_name;
++
++ return ss.str();
++ }
++};
++
++static const int RBDSC_MSG_LEN = sizeof(rbdsc_req_type_t);
++
++} // namespace immutable_obj_cache
++} // namespace ceph
++#endif
+diff --git a/src/tools/ceph_immutable_object_cache/main.cc b/src/tools/ceph_immutable_object_cache/main.cc
+new file mode 100644
+index 0000000..7a9131d
+--- /dev/null
++++ b/src/tools/ceph_immutable_object_cache/main.cc
+@@ -0,0 +1,85 @@
++// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
++// vim: ts=8 sw=2 smarttab
++
++#include "common/ceph_argparse.h"
++#include "common/config.h"
++#include "common/debug.h"
++#include "common/errno.h"
++#include "global/global_init.h"
++#include "global/signal_handler.h"
++#include "CacheController.h"
++
++#include <vector>
++
++ceph::immutable_obj_cache::CacheController *cachectl = nullptr;
++
++void usage() {
++ std::cout << "usage: cache controller [options...]" << std::endl;
++ std::cout << "options:\n";
++ std::cout << " -m monaddress[:port] connect to specified monitor\n";
++ std::cout << " --keyring=<path> path to keyring for local cluster\n";
++ std::cout << " --log-file=<logfile> file to log debug output\n";
++ std::cout << " --debug-rbd-cachecontroller=<log-level>/<memory-level> set rbd-mirror debug level\n";
++ generic_server_usage();
++}
++
++static void handle_signal(int signum)
++{
++ if (cachectl)
++ cachectl->handle_signal(signum);
++}
++
++int main(int argc, const char **argv)
++{
++ std::vector<const char*> args;
++ env_to_vec(args);
++ argv_to_vec(argc, argv, args);
++
++ auto cct = global_init(nullptr, args, CEPH_ENTITY_TYPE_CLIENT,
++ CODE_ENVIRONMENT_DAEMON,
++ CINIT_FLAG_UNPRIVILEGED_DAEMON_DEFAULTS);
++
++ for (auto i = args.begin(); i != args.end(); ++i) {
++ if (ceph_argparse_flag(args, i, "-h", "--help", (char*)NULL)) {
++ usage();
++ return EXIT_SUCCESS;
++ }
++ }
++
++ if (g_conf()->daemonize) {
++ global_init_daemonize(g_ceph_context);
++ }
++ g_ceph_context->enable_perf_counter();
++
++ common_init_finish(g_ceph_context);
++
++ init_async_signal_handler();
++ register_async_signal_handler(SIGHUP, sighup_handler);
++ register_async_signal_handler_oneshot(SIGINT, handle_signal);
++ register_async_signal_handler_oneshot(SIGTERM, handle_signal);
++
++ std::vector<const char*> cmd_args;
++ argv_to_vec(argc, argv, cmd_args);
++
++ // disable unnecessary librbd cache
++ g_ceph_context->_conf.set_val_or_die("rbd_cache", "false");
++
++ cachectl = new ceph::immutable_obj_cache::CacheController(g_ceph_context, cmd_args);
++ int r = cachectl->init();
++ if (r < 0) {
++ std::cerr << "failed to initialize: " << cpp_strerror(r) << std::endl;
++ goto cleanup;
++ }
++
++ cachectl->run();
++
++ cleanup:
++ unregister_async_signal_handler(SIGHUP, sighup_handler);
++ unregister_async_signal_handler(SIGINT, handle_signal);
++ unregister_async_signal_handler(SIGTERM, handle_signal);
++ shutdown_async_signal_handler();
++
++ delete cachectl;
++
++ return r < 0 ? EXIT_SUCCESS : EXIT_FAILURE;
++}
+diff --git a/src/tools/rbd_cache/CMakeLists.txt b/src/tools/rbd_cache/CMakeLists.txt
+deleted file mode 100644
+index 597d802..0000000
+--- a/src/tools/rbd_cache/CMakeLists.txt
++++ /dev/null
+@@ -1,9 +0,0 @@
+-add_executable(rbd-cache
+- ${CMAKE_SOURCE_DIR}/src/librbd/cache/SharedPersistentObjectCacherFile.cc
+- ObjectCacheStore.cc
+- CacheController.cc
+- main.cc)
+-target_link_libraries(rbd-cache
+- librados
+- global)
+-install(TARGETS rbd-cache DESTINATION bin)
+diff --git a/src/tools/rbd_cache/CacheController.cc b/src/tools/rbd_cache/CacheController.cc
+deleted file mode 100644
+index 620192c..0000000
+--- a/src/tools/rbd_cache/CacheController.cc
++++ /dev/null
+@@ -1,116 +0,0 @@
+-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+-// vim: ts=8 sw=2 smarttab
+-
+-#include "CacheController.h"
+-
+-#define dout_context g_ceph_context
+-#define dout_subsys ceph_subsys_rbd_cache
+-#undef dout_prefix
+-#define dout_prefix *_dout << "rbd::cache::CacheController: " << this << " " \
+- << __func__ << ": "
+-
+-namespace rbd {
+-namespace cache {
+-
+-class ThreadPoolSingleton : public ThreadPool {
+-public:
+- ContextWQ *op_work_queue;
+-
+- explicit ThreadPoolSingleton(CephContext *cct)
+- : ThreadPool(cct, "librbd::cache::thread_pool", "tp_librbd_cache", 32,
+- "pcache_threads"),
+- op_work_queue(new ContextWQ("librbd::pcache_op_work_queue",
+- cct->_conf.get_val<int64_t>("rbd_op_thread_timeout"),
+- this)) {
+- start();
+- }
+- ~ThreadPoolSingleton() override {
+- op_work_queue->drain();
+- delete op_work_queue;
+-
+- stop();
+- }
+-};
+-
+-
+-CacheController::CacheController(CephContext *cct, const std::vector<const char*> &args):
+- m_args(args), m_cct(cct) {
+-
+-}
+-
+-CacheController::~CacheController() {
+-
+-}
+-
+-int CacheController::init() {
+- ThreadPoolSingleton* thread_pool_singleton = &m_cct->lookup_or_create_singleton_object<ThreadPoolSingleton>(
+- "rbd::cache::thread_pool", false, m_cct);
+- pcache_op_work_queue = thread_pool_singleton->op_work_queue;
+-
+- m_object_cache_store = new ObjectCacheStore(m_cct, pcache_op_work_queue);
+- int r = m_object_cache_store->init(false);
+- if (r < 0) {
+- //derr << "init error\n" << dendl;
+- }
+- return r;
+-}
+-
+-int CacheController::shutdown() {
+- int r = m_object_cache_store->shutdown();
+- return r;
+-}
+-
+-void CacheController::handle_signal(int signum){}
+-
+-void CacheController::run() {
+- try {
+- //TODO(): use new socket path
+- std::string controller_path = m_cct->_conf.get_val<std::string>("rbd_shared_cache_sock");
+- std::remove(controller_path.c_str());
+-
+- m_cache_server = new CacheServer(controller_path,
+- ([&](uint64_t p, std::string s){handle_request(p, s);}), m_cct);
+- m_cache_server->run();
+- } catch (std::exception& e) {
+- std::cerr << "Exception: " << e.what() << "\n";
+- }
+-}
+-
+-void CacheController::handle_request(uint64_t session_id, std::string msg){
+- rbdsc_req_type_t *io_ctx = (rbdsc_req_type_t*)(msg.c_str());
+-
+- int ret = 0;
+-
+- switch (io_ctx->type) {
+- case RBDSC_REGISTER: {
+- // init cache layout for volume
+- m_object_cache_store->init_cache(io_ctx->vol_name, io_ctx->vol_size);
+- io_ctx->type = RBDSC_REGISTER_REPLY;
+- m_cache_server->send(session_id, std::string((char*)io_ctx, msg.size()));
+-
+- break;
+- }
+- case RBDSC_READ: {
+- // lookup object in local cache store
+- ret = m_object_cache_store->lookup_object(io_ctx->pool_name, io_ctx->vol_name);
+- if (ret < 0) {
+- io_ctx->type = RBDSC_READ_RADOS;
+- } else {
+- io_ctx->type = RBDSC_READ_REPLY;
+- }
+- if (io_ctx->type != RBDSC_READ_REPLY) {
+- assert(0);
+- }
+- m_cache_server->send(session_id, std::string((char*)io_ctx, msg.size()));
+-
+- break;
+- }
+- std::cout<<"can't recongize request"<<std::endl;
+- assert(0); // TODO replace it.
+- }
+-}
+-
+-} // namespace rbd
+-} // namespace cache
+-
+-
+diff --git a/src/tools/rbd_cache/CacheController.h b/src/tools/rbd_cache/CacheController.h
+deleted file mode 100644
+index 0e23484..0000000
+--- a/src/tools/rbd_cache/CacheController.h
++++ /dev/null
+@@ -1,54 +0,0 @@
+-#ifndef CACHE_CONTROLLER_H
+-#define CACHE_CONTROLLER_H
+-
+-#include <thread>
+-
+-#include "common/Formatter.h"
+-#include "common/admin_socket.h"
+-#include "common/debug.h"
+-#include "common/errno.h"
+-#include "common/ceph_context.h"
+-#include "common/Mutex.h"
+-#include "common/WorkQueue.h"
+-#include "include/rados/librados.hpp"
+-#include "include/rbd/librbd.h"
+-#include "include/assert.h"
+-#include "librbd/ImageCtx.h"
+-#include "librbd/ImageState.h"
+-
+-#include "CacheControllerSocket.hpp"
+-#include "ObjectCacheStore.h"
+-
+-
+-using boost::asio::local::stream_protocol;
+-
+-namespace rbd {
+-namespace cache {
+-
+-class CacheController {
+- public:
+- CacheController(CephContext *cct, const std::vector<const char*> &args);
+- ~CacheController();
+-
+- int init();
+-
+- int shutdown();
+-
+- void handle_signal(int sinnum);
+-
+- void run();
+-
+- void handle_request(uint64_t sesstion_id, std::string msg);
+-
+- private:
+- CacheServer *m_cache_server;
+- std::vector<const char*> m_args;
+- CephContext *m_cct;
+- ObjectCacheStore *m_object_cache_store;
+- ContextWQ* pcache_op_work_queue;
+-};
+-
+-} // namespace rbd
+-} // namespace cache
+-
+-#endif
+diff --git a/src/tools/rbd_cache/CacheControllerSocket.hpp b/src/tools/rbd_cache/CacheControllerSocket.hpp
+deleted file mode 100644
+index 2ff7477..0000000
+--- a/src/tools/rbd_cache/CacheControllerSocket.hpp
++++ /dev/null
+@@ -1,228 +0,0 @@
+-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+-// vim: ts=8 sw=2 smarttab
+-
+-#ifndef CACHE_CONTROLLER_SOCKET_H
+-#define CACHE_CONTROLLER_SOCKET_H
+-
+-#include <cstdio>
+-#include <iostream>
+-#include <array>
+-#include <memory>
+-#include <string>
+-#include <boost/bind.hpp>
+-#include <boost/asio.hpp>
+-#include <boost/asio/error.hpp>
+-#include <boost/algorithm/string.hpp>
+-#include "CacheControllerSocketCommon.h"
+-
+-
+-using boost::asio::local::stream_protocol;
+-
+-namespace rbd {
+-namespace cache {
+-
+-class session : public std::enable_shared_from_this<session> {
+-public:
+- session(uint64_t session_id, boost::asio::io_service& io_service, ProcessMsg processmsg)
+- : m_session_id(session_id), m_dm_socket(io_service), process_msg(processmsg) {}
+-
+- stream_protocol::socket& socket() {
+- return m_dm_socket;
+- }
+-
+- void start() {
+- if(true) {
+- serial_handing_request();
+- } else {
+- parallel_handing_request();
+- }
+- }
+- // flow:
+- //
+- // recv request --> process request --> reply ack
+- // | |
+- // --------------<-------------------------
+- void serial_handing_request() {
+- boost::asio::async_read(m_dm_socket, boost::asio::buffer(m_buffer, RBDSC_MSG_LEN),
+- boost::asio::transfer_exactly(RBDSC_MSG_LEN),
+- boost::bind(&session::handle_read,
+- shared_from_this(),
+- boost::asio::placeholders::error,
+- boost::asio::placeholders::bytes_transferred));
+- }
+-
+- // flow :
+- //
+- // --> thread 1: process request
+- // recv request --> thread 2: process request --> reply ack
+- // --> thread n: process request
+- //
+- void parallel_handing_request() {
+- // TODO
+- }
+-
+-private:
+-
+- void handle_read(const boost::system::error_code& error, size_t bytes_transferred) {
+- // when recv eof, the most proble is that client side close socket.
+- // so, server side need to end handing_request
+- if(error == boost::asio::error::eof) {
+- std::cout<<"session: async_read : " << error.message() << std::endl;
+- return;
+- }
+-
+- if(error) {
+- std::cout<<"session: async_read fails: " << error.message() << std::endl;
+- assert(0);
+- }
+-
+- if(bytes_transferred != RBDSC_MSG_LEN) {
+- std::cout<<"session : request in-complete. "<<std::endl;
+- assert(0);
+- }
+-
+- // TODO async_process can increse coding readable.
+- // process_msg_callback call handle async_send
+- process_msg(m_session_id, std::string(m_buffer, bytes_transferred));
+- }
+-
+- void handle_write(const boost::system::error_code& error, size_t bytes_transferred) {
+- if (error) {
+- std::cout<<"session: async_write fails: " << error.message() << std::endl;
+- assert(0);
+- }
+-
+- if(bytes_transferred != RBDSC_MSG_LEN) {
+- std::cout<<"session : reply in-complete. "<<std::endl;
+- assert(0);
+- }
+-
+- boost::asio::async_read(m_dm_socket, boost::asio::buffer(m_buffer),
+- boost::asio::transfer_exactly(RBDSC_MSG_LEN),
+- boost::bind(&session::handle_read,
+- shared_from_this(),
+- boost::asio::placeholders::error,
+- boost::asio::placeholders::bytes_transferred));
+-
+- }
+-
+-public:
+- void send(std::string msg) {
+- boost::asio::async_write(m_dm_socket,
+- boost::asio::buffer(msg.c_str(), msg.size()),
+- boost::asio::transfer_exactly(RBDSC_MSG_LEN),
+- boost::bind(&session::handle_write,
+- shared_from_this(),
+- boost::asio::placeholders::error,
+- boost::asio::placeholders::bytes_transferred));
+-
+- }
+-
+-private:
+- uint64_t m_session_id;
+- stream_protocol::socket m_dm_socket;
+- ProcessMsg process_msg;
+-
+- // Buffer used to store data received from the client.
+- //std::array<char, 1024> data_;
+- char m_buffer[1024];
+-};
+-
+-typedef std::shared_ptr<session> session_ptr;
+-
+-class CacheServer {
+-public:
+- CacheServer(const std::string& file, ProcessMsg processmsg, CephContext* cct)
+- : m_cct(cct), m_server_process_msg(processmsg),
+- m_local_path(file),
+- m_acceptor(m_io_service)
+- {}
+-
+- void run() {
+- bool ret;
+- ret = start_accept();
+- if(!ret) {
+- return;
+- }
+- m_io_service.run();
+- }
+-
+- // TODO : use callback to replace this function.
+- void send(uint64_t session_id, std::string msg) {
+- auto it = m_session_map.find(session_id);
+- if (it != m_session_map.end()) {
+- it->second->send(msg);
+- } else {
+- // TODO : why don't find existing session id ?
+- std::cout<<"don't find session id..."<<std::endl;
+- assert(0);
+- }
+- }
+-
+-private:
+- // when creating one acceptor, can control every step in this way.
+- bool start_accept() {
+- boost::system::error_code ec;
+- m_acceptor.open(m_local_path.protocol(), ec);
+- if(ec) {
+- std::cout << "m_acceptor open fails: " << ec.message() << std::endl;
+- return false;
+- }
+-
+- // TODO control acceptor attribute.
+-
+- m_acceptor.bind(m_local_path, ec);
+- if(ec) {
+- std::cout << "m_acceptor bind fails: " << ec.message() << std::endl;
+- return false;
+- }
+-
+- m_acceptor.listen(boost::asio::socket_base::max_connections, ec);
+- if(ec) {
+- std::cout << "m_acceptor listen fails: " << ec.message() << std::endl;
+- return false;
+- }
+-
+- accept();
+- return true;
+- }
+-
+- void accept() {
+- session_ptr new_session(new session(m_session_id, m_io_service, m_server_process_msg));
+- m_acceptor.async_accept(new_session->socket(),
+- boost::bind(&CacheServer::handle_accept, this, new_session,
+- boost::asio::placeholders::error));
+- }
+-
+- void handle_accept(session_ptr new_session, const boost::system::error_code& error) {
+- //TODO(): open librbd snap ... yuan
+-
+- if(error) {
+- std::cout << "async accept fails : " << error.message() << std::endl;
+- assert(0); // TODO
+- }
+-
+- // must put session into m_session_map at the front of session.start()
+- m_session_map.emplace(m_session_id, new_session);
+- // TODO : session setting
+- new_session->start();
+- m_session_id++;
+-
+- // lanuch next accept
+- accept();
+- }
+-
+-private:
+- CephContext* m_cct;
+- boost::asio::io_service m_io_service; // TODO wrapper it.
+- ProcessMsg m_server_process_msg;
+- stream_protocol::endpoint m_local_path;
+- stream_protocol::acceptor m_acceptor;
+- uint64_t m_session_id = 1;
+- std::map<uint64_t, session_ptr> m_session_map;
+-};
+-
+-} // namespace cache
+-} // namespace rbd
+-
+-#endif
+diff --git a/src/tools/rbd_cache/CacheControllerSocketClient.hpp b/src/tools/rbd_cache/CacheControllerSocketClient.hpp
+deleted file mode 100644
+index 964f888..0000000
+--- a/src/tools/rbd_cache/CacheControllerSocketClient.hpp
++++ /dev/null
+@@ -1,229 +0,0 @@
+-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+-// vim: ts=8 sw=2 smarttab
+-
+-#ifndef CACHE_CONTROLLER_SOCKET_CLIENT_H
+-#define CACHE_CONTROLLER_SOCKET_CLIENT_H
+-
+-#include <atomic>
+-#include <boost/asio.hpp>
+-#include <boost/bind.hpp>
+-#include <boost/asio/error.hpp>
+-#include <boost/algorithm/string.hpp>
+-#include "librbd/ImageCtx.h"
+-#include "include/assert.h"
+-#include "include/Context.h"
+-#include "CacheControllerSocketCommon.h"
+-
+-
+-using boost::asio::local::stream_protocol;
+-
+-namespace rbd {
+-namespace cache {
+-
+-class CacheClient {
+-public:
+- CacheClient(const std::string& file, ClientProcessMsg processmsg, CephContext* ceph_ctx)
+- : m_io_service_work(m_io_service),
+- m_dm_socket(m_io_service),
+- m_client_process_msg(processmsg),
+- m_ep(stream_protocol::endpoint(file)),
+- m_session_work(false),
+- cct(ceph_ctx)
+- {
+- // TODO wrapper io_service
+- std::thread thd([this](){
+- m_io_service.run();});
+- thd.detach();
+- }
+-
+- void run(){
+- }
+-
+- bool is_session_work() {
+- return m_session_work.load() == true;
+- }
+-
+- // just when error occur, call this method.
+- void close() {
+- m_session_work.store(false);
+- boost::system::error_code close_ec;
+- m_dm_socket.close(close_ec);
+- if(close_ec) {
+- std::cout << "close: " << close_ec.message() << std::endl;
+- }
+- std::cout << "session don't work, later all request will be dispatched to rados layer" << std::endl;
+- }
+-
+- int connect() {
+- boost::system::error_code ec;
+- m_dm_socket.connect(m_ep, ec);
+- if(ec) {
+- if(ec == boost::asio::error::connection_refused) {
+- std::cout << ec.message() << " : maybe rbd-cache Controller don't startup. "
+- << "Now data will be read from ceph cluster " << std::endl;
+- } else {
+- std::cout << "connect: " << ec.message() << std::endl;
+- }
+-
+- if(m_dm_socket.is_open()) {
+- // Set to indicate what error occurred, if any.
+- // Note that, even if the function indicates an error,
+- // the underlying descriptor is closed.
+- boost::system::error_code close_ec;
+- m_dm_socket.close(close_ec);
+- if(close_ec) {
+- std::cout << "close: " << close_ec.message() << std::endl;
+- }
+- }
+- return -1;
+- }
+-
+- std::cout<<"connect success"<<std::endl;
+-
+- return 0;
+- }
+-
+- int register_volume(std::string pool_name, std::string vol_name, uint64_t vol_size) {
+- // cache controller will init layout
+- rbdsc_req_type_t *message = new rbdsc_req_type_t();
+- message->type = RBDSC_REGISTER;
+- memcpy(message->pool_name, pool_name.c_str(), pool_name.size());
+- memcpy(message->vol_name, vol_name.c_str(), vol_name.size());
+- message->vol_size = vol_size;
+- message->offset = 0;
+- message->length = 0;
+-
+- uint64_t ret;
+- boost::system::error_code ec;
+-
+- ret = boost::asio::write(m_dm_socket, boost::asio::buffer((char*)message, message->size()), ec);
+- if(ec) {
+- std::cout << "write fails : " << ec.message() << std::endl;
+- return -1;
+- }
+-
+- if(ret != message->size()) {
+- std::cout << "write fails : ret != send_bytes "<< std::endl;
+- return -1;
+- }
+-
+- // hard code TODO
+- ret = boost::asio::read(m_dm_socket, boost::asio::buffer(m_recv_buffer, RBDSC_MSG_LEN), ec);
+- if(ec == boost::asio::error::eof) {
+- std::cout<< "recv eof"<<std::endl;
+- return -1;
+- }
+-
+- if(ec) {
+- std::cout << "write fails : " << ec.message() << std::endl;
+- return -1;
+- }
+-
+- if(ret != RBDSC_MSG_LEN) {
+- std::cout << "write fails : ret != receive bytes " << std::endl;
+- return -1;
+- }
+-
+- m_client_process_msg(std::string(m_recv_buffer, ret));
+-
+- delete message;
+-
+- std::cout << "register volume success" << std::endl;
+-
+- // TODO
+- m_session_work.store(true);
+-
+- return 0;
+- }
+-
+- // if occur any error, we just return false. Then read from rados.
+- int lookup_object(std::string pool_name, std::string vol_name, std::string object_id, Context* on_finish) {
+- rbdsc_req_type_t *message = new rbdsc_req_type_t();
+- message->type = RBDSC_READ;
+- memcpy(message->pool_name, pool_name.c_str(), pool_name.size());
+- memcpy(message->vol_name, object_id.c_str(), object_id.size());
+- message->vol_size = 0;
+- message->offset = 0;
+- message->length = 0;
+-
+- boost::asio::async_write(m_dm_socket,
+- boost::asio::buffer((char*)message, message->size()),
+- boost::asio::transfer_exactly(RBDSC_MSG_LEN),
+- [this, on_finish, message](const boost::system::error_code& err, size_t cb) {
+- delete message;
+- if(err) {
+- std::cout<< "lookup_object: async_write fails." << err.message() << std::endl;
+- close();
+- on_finish->complete(false);
+- return;
+- }
+- if(cb != RBDSC_MSG_LEN) {
+- std::cout<< "lookup_object: async_write fails. in-complete request" <<std::endl;
+- close();
+- on_finish->complete(false);
+- return;
+- }
+- get_result(on_finish);
+- });
+-
+- return 0;
+- }
+-
+- void get_result(Context* on_finish) {
+- boost::asio::async_read(m_dm_socket, boost::asio::buffer(m_recv_buffer, RBDSC_MSG_LEN),
+- boost::asio::transfer_exactly(RBDSC_MSG_LEN),
+- [this, on_finish](const boost::system::error_code& err, size_t cb) {
+- if(err == boost::asio::error::eof) {
+- std::cout<<"get_result: ack is EOF." << std::endl;
+- close();
+- on_finish->complete(false);
+- return;
+- }
+- if(err) {
+- std::cout<< "get_result: async_read fails:" << err.message() << std::endl;
+- close();
+- on_finish->complete(false); // TODO replace this assert with some metohds.
+- return;
+- }
+- if (cb != RBDSC_MSG_LEN) {
+- close();
+- std::cout << "get_result: in-complete ack." << std::endl;
+- on_finish->complete(false); // TODO: replace this assert with some methods.
+- }
+-
+- rbdsc_req_type_t *io_ctx = (rbdsc_req_type_t*)(m_recv_buffer);
+-
+- // TODO: re-occur yuan's bug
+- if(io_ctx->type == RBDSC_READ) {
+- std::cout << "get rbdsc_read... " << std::endl;
+- assert(0);
+- }
+-
+- if (io_ctx->type == RBDSC_READ_REPLY) {
+- on_finish->complete(true);
+- return;
+- } else {
+- on_finish->complete(false);
+- return;
+- }
+- });
+- }
+-
+-private:
+- boost::asio::io_service m_io_service;
+- boost::asio::io_service::work m_io_service_work;
+- stream_protocol::socket m_dm_socket;
+- ClientProcessMsg m_client_process_msg;
+- stream_protocol::endpoint m_ep;
+- char m_recv_buffer[1024];
+-
+- // atomic modfiy for this variable.
+- // thread 1 : asio callback thread modify it.
+- // thread 2 : librbd read it.
+- std::atomic<bool> m_session_work;
+- CephContext* cct;
+-};
+-
+-} // namespace cache
+-} // namespace rbd
+-#endif
+diff --git a/src/tools/rbd_cache/CacheControllerSocketCommon.h b/src/tools/rbd_cache/CacheControllerSocketCommon.h
+deleted file mode 100644
+index e17529a..0000000
+--- a/src/tools/rbd_cache/CacheControllerSocketCommon.h
++++ /dev/null
+@@ -1,62 +0,0 @@
+-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+-// vim: ts=8 sw=2 smarttab
+-
+-#ifndef CACHE_CONTROLLER_SOCKET_COMMON_H
+-#define CACHE_CONTROLLER_SOCKET_COMMON_H
+-
+-/*
+-#define RBDSC_REGISTER 0X11
+-#define RBDSC_READ 0X12
+-#define RBDSC_LOOKUP 0X13
+-#define RBDSC_REGISTER_REPLY 0X14
+-#define RBDSC_READ_REPLY 0X15
+-#define RBDSC_LOOKUP_REPLY 0X16
+-#define RBDSC_READ_RADOS 0X17
+-*/
+-
+-namespace rbd {
+-namespace cache {
+-
+-static const int RBDSC_REGISTER = 0X11;
+-static const int RBDSC_READ = 0X12;
+-static const int RBDSC_LOOKUP = 0X13;
+-static const int RBDSC_REGISTER_REPLY = 0X14;
+-static const int RBDSC_READ_REPLY = 0X15;
+-static const int RBDSC_LOOKUP_REPLY = 0X16;
+-static const int RBDSC_READ_RADOS = 0X17;
+-
+-
+-
+-typedef std::function<void(uint64_t, std::string)> ProcessMsg;
+-typedef std::function<void(std::string)> ClientProcessMsg;
+-typedef uint8_t rbdsc_req_type;
+-struct rbdsc_req_type_t {
+- rbdsc_req_type type;
+- uint64_t vol_size;
+- uint64_t offset;
+- uint64_t length;
+- char pool_name[256];
+- char vol_name[256];
+-
+- uint64_t size() {
+- return sizeof(rbdsc_req_type_t);
+- }
+-
+- std::string to_buffer() {
+- std::stringstream ss;
+- ss << type;
+- ss << vol_size;
+- ss << offset;
+- ss << length;
+- ss << pool_name;
+- ss << vol_name;
+-
+- return ss.str();
+- }
+-};
+-
+-static const int RBDSC_MSG_LEN = sizeof(rbdsc_req_type_t);
+-
+-} // namespace cache
+-} // namespace rbd
+-#endif
+diff --git a/src/tools/rbd_cache/ObjectCacheStore.cc b/src/tools/rbd_cache/ObjectCacheStore.cc
+deleted file mode 100644
+index 99f90d6..0000000
+--- a/src/tools/rbd_cache/ObjectCacheStore.cc
++++ /dev/null
+@@ -1,172 +0,0 @@
+-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+-// vim: ts=8 sw=2 smarttab
+-
+-#include "ObjectCacheStore.h"
+-
+-#define dout_context g_ceph_context
+-#define dout_subsys ceph_subsys_rbd_cache
+-#undef dout_prefix
+-#define dout_prefix *_dout << "rbd::cache::ObjectCacheStore: " << this << " " \
+- << __func__ << ": "
+-
+-namespace rbd {
+-namespace cache {
+-
+-ObjectCacheStore::ObjectCacheStore(CephContext *cct, ContextWQ* work_queue)
+- : m_cct(cct), m_work_queue(work_queue),
+- m_rados(new librados::Rados()) {
+-
+- uint64_t object_cache_entries =
+- cct->_conf.get_val<int64_t>("rbd_shared_cache_entries");
+-
+- //TODO(): allow to set level
+- m_policy = new SimplePolicy(object_cache_entries, 0.5);
+-}
+-
+-ObjectCacheStore::~ObjectCacheStore() {
+- delete m_policy;
+-}
+-
+-int ObjectCacheStore::init(bool reset) {
+-
+- int ret = m_rados->init_with_context(m_cct);
+- if(ret < 0) {
+- lderr(m_cct) << "fail to init Ceph context" << dendl;
+- return ret;
+- }
+-
+- ret = m_rados->connect();
+- if(ret < 0 ) {
+- lderr(m_cct) << "fail to conect to cluster" << dendl;
+- return ret;
+- }
+-
+- std::string cache_path = m_cct->_conf.get_val<std::string>("rbd_shared_cache_path");
+- //TODO(): check and reuse existing cache objects
+- if(reset) {
+- std::string cmd = "exec rm -rf " + cache_path + "/rbd_cache*; exec mkdir -p " + cache_path;
+- //TODO(): to use std::filesystem
+- int r = system(cmd.c_str());
+- }
+-
+- evict_thd = new std::thread([this]{this->evict_thread_body();});
+- return ret;
+-}
+-
+-int ObjectCacheStore::do_promote(std::string pool_name, std::string object_name) {
+- int ret = 0;
+- std::string cache_file_name = pool_name + object_name;
+-
+- //TODO(): lock on ioctx map
+- if (m_ioctxs.find(pool_name) == m_ioctxs.end()) {
+- librados::IoCtx* io_ctx = new librados::IoCtx();
+- ret = m_rados->ioctx_create(pool_name.c_str(), *io_ctx);
+- if (ret < 0) {
+- lderr(m_cct) << "fail to create ioctx" << dendl;
+- assert(0);
+- }
+- m_ioctxs.emplace(pool_name, io_ctx);
+- }
+-
+- assert(m_ioctxs.find(pool_name) != m_ioctxs.end());
+-
+- librados::IoCtx* ioctx = m_ioctxs[pool_name];
+-
+- librados::bufferlist* read_buf = new librados::bufferlist();
+- int object_size = 4096*1024; //TODO(): read config from image metadata
+-
+- //TODO(): async promote
+- ret = promote_object(ioctx, object_name, read_buf, object_size);
+- if (ret == -ENOENT) {
+- read_buf->append(std::string(object_size, '0'));
+- ret = 0;
+- }
+-
+- if( ret < 0) {
+- lderr(m_cct) << "fail to read from rados" << dendl;
+- return ret;
+- }
+-
+- // persistent to cache
+- librbd::cache::SyncFile cache_file(m_cct, cache_file_name);
+- cache_file.open();
+- ret = cache_file.write_object_to_file(*read_buf, object_size);
+-
+- // update metadata
+- assert(OBJ_CACHE_PROMOTING == m_policy->get_status(cache_file_name));
+- m_policy->update_status(cache_file_name, OBJ_CACHE_PROMOTED);
+- assert(OBJ_CACHE_PROMOTED == m_policy->get_status(cache_file_name));
+-
+- return ret;
+-
+-}
+-
+-// return -1, client need to read data from cluster.
+-// return 0, client directly read data from cache.
+-int ObjectCacheStore::lookup_object(std::string pool_name, std::string object_name) {
+-
+- std::string cache_file_name = pool_name + object_name;
+-
+- CACHESTATUS ret;
+- ret = m_policy->lookup_object(cache_file_name);
+-
+- switch(ret) {
+- case OBJ_CACHE_NONE:
+- return do_promote(pool_name, object_name);
+- case OBJ_CACHE_PROMOTED:
+- return 0;
+- case OBJ_CACHE_PROMOTING:
+- default:
+- return -1;
+- }
+-}
+-
+-void ObjectCacheStore::evict_thread_body() {
+- int ret;
+- while(m_evict_go) {
+- ret = evict_objects();
+- }
+-}
+-
+-
+-int ObjectCacheStore::shutdown() {
+- m_evict_go = false;
+- evict_thd->join();
+- m_rados->shutdown();
+- return 0;
+-}
+-
+-int ObjectCacheStore::init_cache(std::string vol_name, uint64_t vol_size) {
+- return 0;
+-}
+-
+-int ObjectCacheStore::lock_cache(std::string vol_name) {
+- return 0;
+-}
+-
+-int ObjectCacheStore::promote_object(librados::IoCtx* ioctx, std::string object_name, librados::bufferlist* read_buf, uint64_t read_len) {
+- int ret;
+-
+- librados::AioCompletion* read_completion = librados::Rados::aio_create_completion();
+-
+- ret = ioctx->aio_read(object_name, read_completion, read_buf, read_len, 0);
+- if(ret < 0) {
+- lderr(m_cct) << "fail to read from rados" << dendl;
+- return ret;
+- }
+- read_completion->wait_for_complete();
+- ret = read_completion->get_return_value();
+- return ret;
+-
+-}
+-
+-int ObjectCacheStore::evict_objects() {
+- std::list<std::string> obj_list;
+- m_policy->get_evict_list(&obj_list);
+- for (auto& obj: obj_list) {
+- //do_evict(obj);
+- }
+-}
+-
+-} // namespace cache
+-} // namespace rbd
+diff --git a/src/tools/rbd_cache/ObjectCacheStore.h b/src/tools/rbd_cache/ObjectCacheStore.h
+deleted file mode 100644
+index ba0e1f1..0000000
+--- a/src/tools/rbd_cache/ObjectCacheStore.h
++++ /dev/null
+@@ -1,70 +0,0 @@
+-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+-// vim: ts=8 sw=2 smarttab
+-
+-#ifndef OBJECT_CACHE_STORE_H
+-#define OBJECT_CACHE_STORE_H
+-
+-#include "common/debug.h"
+-#include "common/errno.h"
+-#include "common/ceph_context.h"
+-#include "common/Mutex.h"
+-#include "include/rados/librados.hpp"
+-#include "include/rbd/librbd.h"
+-#include "librbd/ImageCtx.h"
+-#include "librbd/ImageState.h"
+-#include "librbd/cache/SharedPersistentObjectCacherFile.h"
+-#include "SimplePolicy.hpp"
+-
+-
+-using librados::Rados;
+-using librados::IoCtx;
+-
+-namespace rbd {
+-namespace cache {
+-
+-typedef shared_ptr<librados::Rados> RadosRef;
+-typedef shared_ptr<librados::IoCtx> IoCtxRef;
+-
+-class ObjectCacheStore
+-{
+- public:
+- ObjectCacheStore(CephContext *cct, ContextWQ* work_queue);
+- ~ObjectCacheStore();
+-
+- int init(bool reset);
+-
+- int shutdown();
+-
+- int lookup_object(std::string pool_name, std::string object_name);
+-
+- int init_cache(std::string vol_name, uint64_t vol_size);
+-
+- int lock_cache(std::string vol_name);
+-
+- private:
+- void evict_thread_body();
+- int evict_objects();
+-
+- int do_promote(std::string pool_name, std::string object_name);
+-
+- int promote_object(librados::IoCtx*, std::string object_name,
+- librados::bufferlist* read_buf,
+- uint64_t length);
+-
+- CephContext *m_cct;
+- ContextWQ* m_work_queue;
+- RadosRef m_rados;
+-
+-
+- std::map<std::string, librados::IoCtx*> m_ioctxs;
+-
+- librbd::cache::SyncFile *m_cache_file;
+-
+- Policy* m_policy;
+- std::thread* evict_thd;
+- bool m_evict_go = false;
+-};
+-
+-} // namespace rbd
+-} // namespace cache
+-#endif
+diff --git a/src/tools/rbd_cache/Policy.hpp b/src/tools/rbd_cache/Policy.hpp
+deleted file mode 100644
+index 711e3bd..0000000
+--- a/src/tools/rbd_cache/Policy.hpp
++++ /dev/null
+@@ -1,30 +0,0 @@
+-#ifndef RBD_CACHE_POLICY_HPP
+-#define RBD_CACHE_POLICY_HPP
+-
+-#include <list>
+-#include <string>
+-
+-namespace rbd {
+-namespace cache {
+-
+-enum CACHESTATUS {
+- OBJ_CACHE_NONE = 0,
+- OBJ_CACHE_PROMOTING,
+- OBJ_CACHE_PROMOTED,
+-};
+-
+-
+-class Policy {
+-public:
+- Policy(){}
+- virtual ~Policy(){};
+- virtual CACHESTATUS lookup_object(std::string) = 0;
+- virtual int evict_object(std::string&) = 0;
+- virtual void update_status(std::string, CACHESTATUS) = 0;
+- virtual CACHESTATUS get_status(std::string) = 0;
+- virtual void get_evict_list(std::list<std::string>* obj_list) = 0;
+-};
+-
+-} // namespace cache
+-} // namespace rbd
+-#endif
+diff --git a/src/tools/rbd_cache/SimplePolicy.hpp b/src/tools/rbd_cache/SimplePolicy.hpp
+deleted file mode 100644
+index e785de1..0000000
+--- a/src/tools/rbd_cache/SimplePolicy.hpp
++++ /dev/null
+@@ -1,160 +0,0 @@
+-#ifndef RBD_CACHE_SIMPLE_POLICY_HPP
+-#define RBD_CACHE_SIMPLE_POLICY_HPP
+-
+-#include "Policy.hpp"
+-#include "include/lru.h"
+-#include "common/RWLock.h"
+-#include "common/Mutex.h"
+-
+-#include <vector>
+-#include <unordered_map>
+-#include <string>
+-
+-namespace rbd {
+-namespace cache {
+-
+-
+-class SimplePolicy : public Policy {
+-public:
+- SimplePolicy(uint64_t block_num, float watermark)
+- : m_watermark(watermark), m_entry_count(block_num),
+- m_cache_map_lock("rbd::cache::SimplePolicy::m_cache_map_lock"),
+- m_free_list_lock("rbd::cache::SimplePolicy::m_free_list_lock")
+- {
+-
+- for(uint64_t i = 0; i < m_entry_count; i++) {
+- m_free_list.push_back(new Entry());
+- }
+-
+- }
+-
+- ~SimplePolicy() {
+- for(uint64_t i = 0; i < m_entry_count; i++) {
+- Entry* entry = reinterpret_cast<Entry*>(m_free_list.front());
+- delete entry;
+- m_free_list.pop_front();
+- }
+- }
+-
+- CACHESTATUS lookup_object(std::string cache_file_name) {
+-
+- //TODO(): check race condition
+- RWLock::WLocker wlocker(m_cache_map_lock);
+-
+- auto entry_it = m_cache_map.find(cache_file_name);
+- if(entry_it == m_cache_map.end()) {
+- Mutex::Locker locker(m_free_list_lock);
+- Entry* entry = reinterpret_cast<Entry*>(m_free_list.front());
+- assert(entry != nullptr);
+- m_free_list.pop_front();
+- entry->status = OBJ_CACHE_PROMOTING;
+-
+- m_cache_map[cache_file_name] = entry;
+-
+- return OBJ_CACHE_NONE;
+- }
+-
+- Entry* entry = entry_it->second;
+-
+- if(entry->status == OBJ_CACHE_PROMOTED) {
+- // touch it
+- m_promoted_lru.lru_touch(entry);
+- }
+-
+- return entry->status;
+- }
+-
+- int evict_object(std::string& out_cache_file_name) {
+- RWLock::WLocker locker(m_cache_map_lock);
+-
+- return 1;
+- }
+-
+- // TODO(): simplify the logic
+- void update_status(std::string file_name, CACHESTATUS new_status) {
+- RWLock::WLocker locker(m_cache_map_lock);
+-
+- Entry* entry;
+- auto entry_it = m_cache_map.find(file_name);
+-
+- // just check.
+- if(new_status == OBJ_CACHE_PROMOTING) {
+- assert(entry_it == m_cache_map.end());
+- }
+-
+- assert(entry_it != m_cache_map.end());
+-
+- entry = entry_it->second;
+-
+- // promoting is done, so update it.
+- if(entry->status == OBJ_CACHE_PROMOTING && new_status== OBJ_CACHE_PROMOTED) {
+- m_promoted_lru.lru_insert_top(entry);
+- entry->status = new_status;
+- return;
+- }
+-
+- assert(0);
+- }
+-
+- // get entry status
+- CACHESTATUS get_status(std::string file_name) {
+- RWLock::RLocker locker(m_cache_map_lock);
+- auto entry_it = m_cache_map.find(file_name);
+- if(entry_it == m_cache_map.end()) {
+- return OBJ_CACHE_NONE;
+- }
+-
+- return entry_it->second->status;
+- }
+-
+- void get_evict_list(std::list<std::string>* obj_list) {
+- RWLock::WLocker locker(m_cache_map_lock);
+- // check free ratio, pop entries from LRU
+- if (m_free_list.size() / m_entry_count < m_watermark) {
+- int evict_num = 10; //TODO(): make this configurable
+- for(int i = 0; i < evict_num; i++) {
+- Entry* entry = reinterpret_cast<Entry*>(m_promoted_lru.lru_expire());
+- if (entry == nullptr) {
+- continue;
+- }
+- std::string file_name = entry->cache_file_name;
+- obj_list->push_back(file_name);
+-
+- auto entry_it = m_cache_map.find(file_name);
+- m_cache_map.erase(entry_it);
+-
+- //mark this entry as free
+- entry->status = OBJ_CACHE_NONE;
+- Mutex::Locker locker(m_free_list_lock);
+- m_free_list.push_back(entry);
+- }
+- }
+- }
+-
+-private:
+-
+- class Entry : public LRUObject {
+- public:
+- CACHESTATUS status;
+- Entry() : status(OBJ_CACHE_NONE){}
+- std::string cache_file_name;
+- void encode(bufferlist &bl){}
+- void decode(bufferlist::iterator &it){}
+- };
+-
+- float m_watermark;
+- uint64_t m_entry_count;
+-
+- std::unordered_map<std::string, Entry*> m_cache_map;
+- RWLock m_cache_map_lock;
+-
+- std::deque<Entry*> m_free_list;
+- Mutex m_free_list_lock;
+-
+- LRU m_promoted_lru; // include promoted, using status.
+-
+-};
+-
+-} // namespace cache
+-} // namespace rbd
+-#endif
+diff --git a/src/tools/rbd_cache/main.cc b/src/tools/rbd_cache/main.cc
+deleted file mode 100644
+index d604760..0000000
+--- a/src/tools/rbd_cache/main.cc
++++ /dev/null
+@@ -1,85 +0,0 @@
+-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+-// vim: ts=8 sw=2 smarttab
+-
+-#include "common/ceph_argparse.h"
+-#include "common/config.h"
+-#include "common/debug.h"
+-#include "common/errno.h"
+-#include "global/global_init.h"
+-#include "global/signal_handler.h"
+-#include "CacheController.h"
+-
+-#include <vector>
+-
+-rbd::cache::CacheController *cachectl = nullptr;
+-
+-void usage() {
+- std::cout << "usage: cache controller [options...]" << std::endl;
+- std::cout << "options:\n";
+- std::cout << " -m monaddress[:port] connect to specified monitor\n";
+- std::cout << " --keyring=<path> path to keyring for local cluster\n";
+- std::cout << " --log-file=<logfile> file to log debug output\n";
+- std::cout << " --debug-rbd-cachecontroller=<log-level>/<memory-level> set rbd-mirror debug level\n";
+- generic_server_usage();
+-}
+-
+-static void handle_signal(int signum)
+-{
+- if (cachectl)
+- cachectl->handle_signal(signum);
+-}
+-
+-int main(int argc, const char **argv)
+-{
+- std::vector<const char*> args;
+- env_to_vec(args);
+- argv_to_vec(argc, argv, args);
+-
+- auto cct = global_init(nullptr, args, CEPH_ENTITY_TYPE_CLIENT,
+- CODE_ENVIRONMENT_DAEMON,
+- CINIT_FLAG_UNPRIVILEGED_DAEMON_DEFAULTS);
+-
+- for (auto i = args.begin(); i != args.end(); ++i) {
+- if (ceph_argparse_flag(args, i, "-h", "--help", (char*)NULL)) {
+- usage();
+- return EXIT_SUCCESS;
+- }
+- }
+-
+- if (g_conf()->daemonize) {
+- global_init_daemonize(g_ceph_context);
+- }
+- g_ceph_context->enable_perf_counter();
+-
+- common_init_finish(g_ceph_context);
+-
+- init_async_signal_handler();
+- register_async_signal_handler(SIGHUP, sighup_handler);
+- register_async_signal_handler_oneshot(SIGINT, handle_signal);
+- register_async_signal_handler_oneshot(SIGTERM, handle_signal);
+-
+- std::vector<const char*> cmd_args;
+- argv_to_vec(argc, argv, cmd_args);
+-
+- // disable unnecessary librbd cache
+- g_ceph_context->_conf.set_val_or_die("rbd_cache", "false");
+-
+- cachectl = new rbd::cache::CacheController(g_ceph_context, cmd_args);
+- int r = cachectl->init();
+- if (r < 0) {
+- std::cerr << "failed to initialize: " << cpp_strerror(r) << std::endl;
+- goto cleanup;
+- }
+-
+- cachectl->run();
+-
+- cleanup:
+- unregister_async_signal_handler(SIGHUP, sighup_handler);
+- unregister_async_signal_handler(SIGINT, handle_signal);
+- unregister_async_signal_handler(SIGTERM, handle_signal);
+- shutdown_async_signal_handler();
+-
+- delete cachectl;
+-
+- return r < 0 ? EXIT_SUCCESS : EXIT_FAILURE;
+-}
+--
+2.7.4
+