summaryrefslogtreecommitdiffstats
path: root/src/ceph/0001-librbd-shared-persistent-read-only-rbd-cache.patch
diff options
context:
space:
mode:
Diffstat (limited to 'src/ceph/0001-librbd-shared-persistent-read-only-rbd-cache.patch')
-rw-r--r--src/ceph/0001-librbd-shared-persistent-read-only-rbd-cache.patch1685
1 files changed, 1685 insertions, 0 deletions
diff --git a/src/ceph/0001-librbd-shared-persistent-read-only-rbd-cache.patch b/src/ceph/0001-librbd-shared-persistent-read-only-rbd-cache.patch
new file mode 100644
index 0000000..0476086
--- /dev/null
+++ b/src/ceph/0001-librbd-shared-persistent-read-only-rbd-cache.patch
@@ -0,0 +1,1685 @@
+From b7b81562c76011abe05930330915a5ba423964e4 Mon Sep 17 00:00:00 2001
+From: Yuan Zhou <yuan.zhou@intel.com>
+Date: Thu, 19 Apr 2018 22:54:36 +0800
+Subject: [PATCH 01/10] librbd: shared persistent read-only rbd cache
+
+This patch introduces introduces RBD shared persistent RO cache which
+can provide client-side sharing cache for rbd clone/snapshot case.
+
+The key componenets are:
+
+- RBD cache daemon runs on each compute node to control the shared cache state
+
+- Read-only blocks from parent image(s) are cached in a shared area on
+ compute node(s)
+
+- Object level dispatcher inside librbd that can do RPC with cache daemon to
+ lookup the cache
+
+- Reads are served from the shared cache until the first COW request
+
+- Policy to control promotion/evication of the shared cache
+
+The general IO flow is:
+
+0) Parent image would register themselfs when initializing
+
+1) When read request on cloned image flows to parent image, it will check with
+ the cache daemon if the rarget object is ready
+
+2) Cache daemon receives the lookup request:
+ a) if the target object is promoted, daemon will ack with "read_from_cache"
+ b) if it is not promoted, daemon will check the policy whether to promote:
+ - if yes, daemon will do the promiton then ack with "read_from_cache"
+ - if no, daemon will ack with "read_from_rados"
+
+3) the read reqeust contines to do read from cache/rados based on the ack
+
+Signed-off-by: Yuan Zhou <yuan.zhou@intel.com>
+---
+ src/common/options.cc | 8 ++
+ src/librbd/CMakeLists.txt | 4 +-
+ src/librbd/ImageCtx.cc | 5 +-
+ src/librbd/ImageCtx.h | 3 +
+ src/librbd/cache/SharedPersistentObjectCacher.cc | 61 ++++++++
+ src/librbd/cache/SharedPersistentObjectCacher.h | 45 ++++++
+ .../SharedPersistentObjectCacherObjectDispatch.cc | 154 +++++++++++++++++++++
+ .../SharedPersistentObjectCacherObjectDispatch.h | 127 +++++++++++++++++
+ src/librbd/image/OpenRequest.cc | 12 +-
+ src/librbd/io/Types.h | 1 +
+ src/os/CacheStore/SyncFile.cc | 110 +++++++++++++++
+ src/os/CacheStore/SyncFile.h | 74 ++++++++++
+ src/test/librbd/test_mirroring.cc | 1 +
+ src/test/rbd_mirror/test_ImageReplayer.cc | 2 +
+ src/test/rbd_mirror/test_fixture.cc | 1 +
+ src/tools/CMakeLists.txt | 1 +
+ src/tools/rbd_cache/CMakeLists.txt | 9 ++
+ src/tools/rbd_cache/CacheController.cc | 105 ++++++++++++++
+ src/tools/rbd_cache/CacheController.hpp | 49 +++++++
+ src/tools/rbd_cache/CacheControllerSocket.hpp | 125 +++++++++++++++++
+ .../rbd_cache/CacheControllerSocketClient.hpp | 131 ++++++++++++++++++
+ src/tools/rbd_cache/CacheControllerSocketCommon.h | 43 ++++++
+ src/tools/rbd_cache/ObjectCacheStore.cc | 147 ++++++++++++++++++++
+ src/tools/rbd_cache/ObjectCacheStore.h | 65 +++++++++
+ src/tools/rbd_cache/main.cc | 85 ++++++++++++
+ 25 files changed, 1365 insertions(+), 3 deletions(-)
+ create mode 100644 src/librbd/cache/SharedPersistentObjectCacher.cc
+ create mode 100644 src/librbd/cache/SharedPersistentObjectCacher.h
+ create mode 100644 src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc
+ create mode 100644 src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.h
+ create mode 100644 src/os/CacheStore/SyncFile.cc
+ create mode 100644 src/os/CacheStore/SyncFile.h
+ create mode 100644 src/tools/rbd_cache/CMakeLists.txt
+ create mode 100644 src/tools/rbd_cache/CacheController.cc
+ create mode 100644 src/tools/rbd_cache/CacheController.hpp
+ create mode 100644 src/tools/rbd_cache/CacheControllerSocket.hpp
+ create mode 100644 src/tools/rbd_cache/CacheControllerSocketClient.hpp
+ create mode 100644 src/tools/rbd_cache/CacheControllerSocketCommon.h
+ create mode 100644 src/tools/rbd_cache/ObjectCacheStore.cc
+ create mode 100644 src/tools/rbd_cache/ObjectCacheStore.h
+ create mode 100644 src/tools/rbd_cache/main.cc
+
+diff --git a/src/common/options.cc b/src/common/options.cc
+index c5afe4c..7839a31 100644
+--- a/src/common/options.cc
++++ b/src/common/options.cc
+@@ -6357,6 +6357,14 @@ static std::vector<Option> get_rbd_options() {
+ .set_default(60)
+ .set_description("time in seconds for detecting a hung thread"),
+
++ Option("rbd_shared_cache_enabled", Option::TYPE_BOOL, Option::LEVEL_ADVANCED)
++ .set_default(true)
++ .set_description("whether to enable shared ssd caching"),
++
++ Option("rbd_shared_cache_path", Option::TYPE_STR, Option::LEVEL_ADVANCED)
++ .set_default("/tmp")
++ .set_description("shared ssd caching data dir"),
++
+ Option("rbd_non_blocking_aio", Option::TYPE_BOOL, Option::LEVEL_ADVANCED)
+ .set_default(true)
+ .set_description("process AIO ops from a dispatch thread to prevent blocking"),
+diff --git a/src/librbd/CMakeLists.txt b/src/librbd/CMakeLists.txt
+index b9c08d4..92539a8 100644
+--- a/src/librbd/CMakeLists.txt
++++ b/src/librbd/CMakeLists.txt
+@@ -32,7 +32,8 @@ set(librbd_internal_srcs
+ api/Snapshot.cc
+ cache/ImageWriteback.cc
+ cache/ObjectCacherObjectDispatch.cc
+- cache/PassthroughImageCache.cc
++ cache/SharedPersistentObjectCacherObjectDispatch.cc
++ cache/SharedPersistentObjectCacher.cc
+ deep_copy/ImageCopyRequest.cc
+ deep_copy/MetadataCopyRequest.cc
+ deep_copy/ObjectCopyRequest.cc
+@@ -123,6 +124,7 @@ set(librbd_internal_srcs
+ trash/MoveRequest.cc
+ watcher/Notifier.cc
+ watcher/RewatchRequest.cc
++ ${CMAKE_SOURCE_DIR}/src/os/CacheStore/SyncFile.cc
+ ${CMAKE_SOURCE_DIR}/src/common/ContextCompletion.cc)
+
+ add_library(rbd_api STATIC librbd.cc)
+diff --git a/src/librbd/ImageCtx.cc b/src/librbd/ImageCtx.cc
+index 48f98b1..349156b 100644
+--- a/src/librbd/ImageCtx.cc
++++ b/src/librbd/ImageCtx.cc
+@@ -776,7 +776,8 @@ public:
+ "rbd_qos_read_iops_limit", false)(
+ "rbd_qos_write_iops_limit", false)(
+ "rbd_qos_read_bps_limit", false)(
+- "rbd_qos_write_bps_limit", false);
++ "rbd_qos_write_bps_limit", false)(
++ "rbd_shared_cache_enabled", false);
+
+ ConfigProxy local_config_t{false};
+ std::map<std::string, bufferlist> res;
+@@ -844,6 +845,8 @@ public:
+ ASSIGN_OPTION(qos_write_iops_limit, uint64_t);
+ ASSIGN_OPTION(qos_read_bps_limit, uint64_t);
+ ASSIGN_OPTION(qos_write_bps_limit, uint64_t);
++ ASSIGN_OPTION(shared_cache_enabled, bool);
++ ASSIGN_OPTION(shared_cache_path, std::string);
+
+ if (thread_safe) {
+ ASSIGN_OPTION(journal_pool, std::string);
+diff --git a/src/librbd/ImageCtx.h b/src/librbd/ImageCtx.h
+index d197c24..f661c09 100644
+--- a/src/librbd/ImageCtx.h
++++ b/src/librbd/ImageCtx.h
+@@ -204,6 +204,9 @@ namespace librbd {
+ uint64_t qos_read_bps_limit;
+ uint64_t qos_write_bps_limit;
+
++ bool shared_cache_enabled;
++ std::string shared_cache_path;
++
+ LibrbdAdminSocketHook *asok_hook;
+
+ exclusive_lock::Policy *exclusive_lock_policy = nullptr;
+diff --git a/src/librbd/cache/SharedPersistentObjectCacher.cc b/src/librbd/cache/SharedPersistentObjectCacher.cc
+new file mode 100644
+index 0000000..a849260
+--- /dev/null
++++ b/src/librbd/cache/SharedPersistentObjectCacher.cc
+@@ -0,0 +1,61 @@
++// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
++// vim: ts=8 sw=2 smarttab
++
++#include "librbd/cache/SharedPersistentObjectCacher.h"
++#include "include/buffer.h"
++#include "common/dout.h"
++#include "librbd/ImageCtx.h"
++
++#define dout_subsys ceph_subsys_rbd
++#undef dout_prefix
++#define dout_prefix *_dout << "librbd::cache::SharedPersistentObjectCacher: " << this \
++ << " " << __func__ << ": "
++
++namespace librbd {
++namespace cache {
++
++template <typename I>
++SharedPersistentObjectCacher<I>::SharedPersistentObjectCacher(I *image_ctx, std::string cache_path)
++ : m_image_ctx(image_ctx), m_cache_path(cache_path),
++ m_file_map_lock("librbd::cache::SharedObjectCacher::filemaplock") {
++ auto *cct = m_image_ctx->cct;
++
++}
++
++template <typename I>
++SharedPersistentObjectCacher<I>::~SharedPersistentObjectCacher() {
++ for(auto &it: file_map) {
++ if(it.second) {
++ delete it.second;
++ }
++ }
++}
++
++template <typename I>
++int SharedPersistentObjectCacher<I>::read_object(std::string oid, ceph::bufferlist* read_data, uint64_t offset, uint64_t length, Context *on_finish) {
++
++ auto *cct = m_image_ctx->cct;
++ ldout(cct, 20) << "object: " << oid << dendl;
++
++ std::string cache_file_name = m_image_ctx->data_ctx.get_pool_name() + oid;
++
++ //TODO(): make a cache for cachefile fd
++ os::CacheStore::SyncFile* target_cache_file = new os::CacheStore::SyncFile(cct, cache_file_name);
++ target_cache_file->open();
++
++ int ret = target_cache_file->read_object_from_file(read_data, offset, length);
++ if (ret < 0) {
++ ldout(cct, 5) << "read from file return error: " << ret
++ << "file name= " << cache_file_name
++ << dendl;
++ }
++
++ delete target_cache_file;
++ return ret;
++}
++
++
++} // namespace cache
++} // namespace librbd
++
++template class librbd::cache::SharedPersistentObjectCacher<librbd::ImageCtx>;
+diff --git a/src/librbd/cache/SharedPersistentObjectCacher.h b/src/librbd/cache/SharedPersistentObjectCacher.h
+new file mode 100644
+index 0000000..d108a05
+--- /dev/null
++++ b/src/librbd/cache/SharedPersistentObjectCacher.h
+@@ -0,0 +1,45 @@
++// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
++// vim: ts=8 sw=2 smarttab
++
++#ifndef CEPH_LIBRBD_CACHE_SHARED_PERSISTENT_OBJECT_CACHER
++#define CEPH_LIBRBD_CACHE_SHARED_PERSISTENT_OBJECT_CACHER
++
++#include "include/buffer_fwd.h"
++#include "include/int_types.h"
++#include "os/CacheStore/SyncFile.h"
++#include "common/Mutex.h"
++#include <vector>
++#include <map>
++
++struct Context;
++
++namespace librbd {
++
++struct ImageCtx;
++
++namespace cache {
++
++template <typename ImageCtxT>
++class SharedPersistentObjectCacher {
++public:
++
++ SharedPersistentObjectCacher(ImageCtxT *image_ctx, std::string cache_path);
++ ~SharedPersistentObjectCacher();
++
++ int read_object(std::string oid, ceph::bufferlist* read_data,
++ uint64_t offset, uint64_t length, Context *on_finish);
++
++private:
++ ImageCtxT *m_image_ctx;
++ std::map<std::string, os::CacheStore::SyncFile*> file_map;
++ Mutex m_file_map_lock;
++ std::string m_cache_path;
++
++};
++
++} // namespace cache
++} // namespace librbd
++
++extern template class librbd::cache::SharedPersistentObjectCacher<librbd::ImageCtx>;
++
++#endif // CEPH_LIBRBD_CACHE_FILE_IMAGE_STORE
+diff --git a/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc b/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc
+new file mode 100644
+index 0000000..90d886c
+--- /dev/null
++++ b/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc
+@@ -0,0 +1,154 @@
++// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
++// vim: ts=8 sw=2 smarttab
++
++#include "librbd/cache/SharedPersistentObjectCacherObjectDispatch.h"
++#include "common/WorkQueue.h"
++#include "librbd/ImageCtx.h"
++#include "librbd/Journal.h"
++#include "librbd/Utils.h"
++#include "librbd/LibrbdWriteback.h"
++#include "librbd/io/ObjectDispatchSpec.h"
++#include "librbd/io/ObjectDispatcher.h"
++#include "librbd/io/Utils.h"
++#include "osd/osd_types.h"
++#include "osdc/WritebackHandler.h"
++#include <vector>
++
++#define dout_subsys ceph_subsys_rbd
++#undef dout_prefix
++#define dout_prefix *_dout << "librbd::cache::SharedPersistentObjectCacherObjectDispatch: " \
++ << this << " " << __func__ << ": "
++
++namespace librbd {
++namespace cache {
++
++template <typename I>
++SharedPersistentObjectCacherObjectDispatch<I>::SharedPersistentObjectCacherObjectDispatch(
++ I* image_ctx) : m_image_ctx(image_ctx) {
++}
++
++template <typename I>
++SharedPersistentObjectCacherObjectDispatch<I>::~SharedPersistentObjectCacherObjectDispatch() {
++ if (m_object_store) {
++ delete m_object_store;
++ }
++
++ if (m_cache_client) {
++ delete m_cache_client;
++ }
++}
++
++template <typename I>
++void SharedPersistentObjectCacherObjectDispatch<I>::init() {
++ auto cct = m_image_ctx->cct;
++ ldout(cct, 5) << dendl;
++
++ if (m_image_ctx->parent != nullptr) {
++ //TODO(): should we cover multi-leveled clone?
++ ldout(cct, 5) << "child image: skipping SRO cache client" << dendl;
++ return;
++ }
++
++ ldout(cct, 20) << "parent image: setup SRO cache client = " << dendl;
++
++ std::string controller_path = "/tmp/rbd_shared_readonly_cache_demo";
++ m_cache_client = new CacheClient(io_service, controller_path.c_str(),
++ ([&](std::string s){client_handle_request(s);}));
++
++ int ret = m_cache_client->connect();
++ if (ret < 0) {
++ ldout(cct, 5) << "SRO cache client fail to connect with local controller: "
++ << "please start rbd-cache daemon"
++ << dendl;
++ } else {
++ ldout(cct, 5) << "SRO cache client to register volume on rbd-cache daemon: "
++ << "name = " << m_image_ctx->id
++ << dendl;
++
++ ret = m_cache_client->register_volume(m_image_ctx->data_ctx.get_pool_name(),
++ m_image_ctx->id, m_image_ctx->size);
++
++ if (ret >= 0) {
++ // add ourself to the IO object dispatcher chain
++ m_image_ctx->io_object_dispatcher->register_object_dispatch(this);
++ }
++ }
++}
++
++template <typename I>
++bool SharedPersistentObjectCacherObjectDispatch<I>::read(
++ const std::string &oid, uint64_t object_no, uint64_t object_off,
++ uint64_t object_len, librados::snap_t snap_id, int op_flags,
++ const ZTracer::Trace &parent_trace, ceph::bufferlist* read_data,
++ io::ExtentMap* extent_map, int* object_dispatch_flags,
++ io::DispatchResult* dispatch_result, Context** on_finish,
++ Context* on_dispatched) {
++ // IO chained in reverse order
++ auto cct = m_image_ctx->cct;
++ ldout(cct, 20) << "object_no=" << object_no << " " << object_off << "~"
++ << object_len << dendl;
++
++ // ensure we aren't holding the cache lock post-read
++ on_dispatched = util::create_async_context_callback(*m_image_ctx,
++ on_dispatched);
++
++ if (m_cache_client && m_cache_client->connected && m_object_store) {
++ bool exists;
++ m_cache_client->lookup_object(m_image_ctx->data_ctx.get_pool_name(),
++ m_image_ctx->id, oid, &exists);
++
++ // try to read from parent image
++ ldout(cct, 20) << "SRO cache object exists:" << exists << dendl;
++ if (exists) {
++ int r = m_object_store->read_object(oid, read_data, object_off, object_len, on_dispatched);
++ if (r != 0) {
++ *dispatch_result = io::DISPATCH_RESULT_COMPLETE;
++ on_dispatched->complete(r);
++ return true;
++ }
++ }
++ }
++
++ ldout(cct, 20) << "Continue read from RADOS" << dendl;
++ *dispatch_result = io::DISPATCH_RESULT_CONTINUE;
++ on_dispatched->complete(0);
++ return true;
++}
++
++template <typename I>
++void SharedPersistentObjectCacherObjectDispatch<I>::client_handle_request(std::string msg) {
++ auto cct = m_image_ctx->cct;
++ ldout(cct, 20) << dendl;
++
++ rbdsc_req_type_t *io_ctx = (rbdsc_req_type_t*)(msg.c_str());
++
++ switch (io_ctx->type) {
++ case RBDSC_REGISTER_REPLY: {
++ // open cache handler for volume
++ ldout(cct, 20) << "SRO cache client open cache handler" << dendl;
++ m_object_store = new SharedPersistentObjectCacher<I>(m_image_ctx, m_image_ctx->shared_cache_path);
++
++ break;
++ }
++ case RBDSC_READ_REPLY: {
++ ldout(cct, 20) << "SRO cache client start to read cache" << dendl;
++ //TODO(): should call read here
++
++ break;
++ }
++ case RBDSC_READ_RADOS: {
++ ldout(cct, 20) << "SRO cache client start to read rados" << dendl;
++ //TODO(): should call read here
++
++ break;
++ }
++ default: ldout(cct, 20) << "nothing" << dendl;
++ break;
++
++ }
++}
++
++} // namespace cache
++} // namespace librbd
++
++template class librbd::cache::SharedPersistentObjectCacherObjectDispatch<librbd::ImageCtx>;
+diff --git a/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.h b/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.h
+new file mode 100644
+index 0000000..1ede804
+--- /dev/null
++++ b/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.h
+@@ -0,0 +1,127 @@
++// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
++// vim: ts=8 sw=2 smarttab
++
++#ifndef CEPH_LIBRBD_CACHE_SHARED_PERSISTENT_OBJECT_CACHER_OBJECT_DISPATCH_H
++#define CEPH_LIBRBD_CACHE_SHARED_PERSISTENT_OBJECT_CACHER_OBJECT_DISPATCH_H
++
++#include "librbd/io/ObjectDispatchInterface.h"
++#include "common/Mutex.h"
++#include "osdc/ObjectCacher.h"
++#include "tools/rbd_cache/CacheControllerSocketClient.hpp"
++#include "SharedPersistentObjectCacher.h"
++
++struct WritebackHandler;
++
++namespace librbd {
++
++class ImageCtx;
++
++namespace cache {
++
++/**
++ * Facade around the OSDC object cacher to make it align with
++ * the object dispatcher interface
++ */
++template <typename ImageCtxT = ImageCtx>
++class SharedPersistentObjectCacherObjectDispatch : public io::ObjectDispatchInterface {
++public:
++ static SharedPersistentObjectCacherObjectDispatch* create(ImageCtxT* image_ctx) {
++ return new SharedPersistentObjectCacherObjectDispatch(image_ctx);
++ }
++
++ SharedPersistentObjectCacherObjectDispatch(ImageCtxT* image_ctx);
++ ~SharedPersistentObjectCacherObjectDispatch() override;
++
++ io::ObjectDispatchLayer get_object_dispatch_layer() const override {
++ return io::OBJECT_DISPATCH_LAYER_SHARED_PERSISTENT_CACHE;
++ }
++
++ void init();
++ void shut_down(Context* on_finish) {
++ m_image_ctx->op_work_queue->queue(on_finish, 0);
++ }
++
++ bool read(
++ const std::string &oid, uint64_t object_no, uint64_t object_off,
++ uint64_t object_len, librados::snap_t snap_id, int op_flags,
++ const ZTracer::Trace &parent_trace, ceph::bufferlist* read_data,
++ io::ExtentMap* extent_map, int* object_dispatch_flags,
++ io::DispatchResult* dispatch_result, Context** on_finish,
++ Context* on_dispatched) override;
++
++ bool discard(
++ const std::string &oid, uint64_t object_no, uint64_t object_off,
++ uint64_t object_len, const ::SnapContext &snapc, int discard_flags,
++ const ZTracer::Trace &parent_trace, int* object_dispatch_flags,
++ uint64_t* journal_tid, io::DispatchResult* dispatch_result,
++ Context** on_finish, Context* on_dispatched) {
++ return false;
++ }
++
++ bool write(
++ const std::string &oid, uint64_t object_no, uint64_t object_off,
++ ceph::bufferlist&& data, const ::SnapContext &snapc, int op_flags,
++ const ZTracer::Trace &parent_trace, int* object_dispatch_flags,
++ uint64_t* journal_tid, io::DispatchResult* dispatch_result,
++ Context** on_finish, Context* on_dispatched) {
++ return false;
++ }
++
++ bool write_same(
++ const std::string &oid, uint64_t object_no, uint64_t object_off,
++ uint64_t object_len, io::Extents&& buffer_extents,
++ ceph::bufferlist&& data, const ::SnapContext &snapc, int op_flags,
++ const ZTracer::Trace &parent_trace, int* object_dispatch_flags,
++ uint64_t* journal_tid, io::DispatchResult* dispatch_result,
++ Context** on_finish, Context* on_dispatched) {
++ return false;
++ }
++
++ bool compare_and_write(
++ const std::string &oid, uint64_t object_no, uint64_t object_off,
++ ceph::bufferlist&& cmp_data, ceph::bufferlist&& write_data,
++ const ::SnapContext &snapc, int op_flags,
++ const ZTracer::Trace &parent_trace, uint64_t* mismatch_offset,
++ int* object_dispatch_flags, uint64_t* journal_tid,
++ io::DispatchResult* dispatch_result, Context** on_finish,
++ Context* on_dispatched) {
++ return false;
++ }
++
++ bool flush(
++ io::FlushSource flush_source, const ZTracer::Trace &parent_trace,
++ io::DispatchResult* dispatch_result, Context** on_finish,
++ Context* on_dispatched) {
++ return false;
++ }
++
++ bool invalidate_cache(Context* on_finish) {
++ return false;
++ }
++
++ bool reset_existence_cache(Context* on_finish) {
++ return false;
++ }
++
++ void extent_overwritten(
++ uint64_t object_no, uint64_t object_off, uint64_t object_len,
++ uint64_t journal_tid, uint64_t new_journal_tid) {
++ }
++
++ SharedPersistentObjectCacher<ImageCtxT> *m_object_store = nullptr;
++
++private:
++
++ ImageCtxT* m_image_ctx;
++
++ void client_handle_request(std::string msg);
++ CacheClient *m_cache_client = nullptr;
++ boost::asio::io_service io_service;
++};
++
++} // namespace cache
++} // namespace librbd
++
++extern template class librbd::cache::SharedPersistentObjectCacherObjectDispatch<librbd::ImageCtx>;
++
++#endif // CEPH_LIBRBD_CACHE_OBJECT_CACHER_OBJECT_DISPATCH_H
+diff --git a/src/librbd/image/OpenRequest.cc b/src/librbd/image/OpenRequest.cc
+index ae18739..30a7b66 100644
+--- a/src/librbd/image/OpenRequest.cc
++++ b/src/librbd/image/OpenRequest.cc
+@@ -8,6 +8,7 @@
+ #include "librbd/ImageCtx.h"
+ #include "librbd/Utils.h"
+ #include "librbd/cache/ObjectCacherObjectDispatch.h"
++#include "librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc"
+ #include "librbd/image/CloseRequest.h"
+ #include "librbd/image/RefreshRequest.h"
+ #include "librbd/image/SetSnapRequest.h"
+@@ -448,12 +449,21 @@ Context *OpenRequest<I>::handle_refresh(int *result) {
+
+ template <typename I>
+ Context *OpenRequest<I>::send_init_cache(int *result) {
++
++ CephContext *cct = m_image_ctx->cct;
+ // cache is disabled or parent image context
+ if (!m_image_ctx->cache || m_image_ctx->child != nullptr) {
++
++ // enable Shared Read-only cache for parent image
++ if (m_image_ctx->child != nullptr && m_image_ctx->shared_cache_enabled ) {
++ ldout(cct, 10) << this << " " << "setting up parent cache"<< dendl;
++ auto sro_cache = cache::SharedPersistentObjectCacherObjectDispatch<I>::create(m_image_ctx);
++ sro_cache->init();
++ }
++
+ return send_register_watch(result);
+ }
+
+- CephContext *cct = m_image_ctx->cct;
+ ldout(cct, 10) << this << " " << __func__ << dendl;
+
+ auto cache = cache::ObjectCacherObjectDispatch<I>::create(m_image_ctx);
+diff --git a/src/librbd/io/Types.h b/src/librbd/io/Types.h
+index 7e09c90..ef3049f 100644
+--- a/src/librbd/io/Types.h
++++ b/src/librbd/io/Types.h
+@@ -59,6 +59,7 @@ enum DispatchResult {
+ enum ObjectDispatchLayer {
+ OBJECT_DISPATCH_LAYER_NONE = 0,
+ OBJECT_DISPATCH_LAYER_CACHE,
++ OBJECT_DISPATCH_LAYER_SHARED_PERSISTENT_CACHE,
+ OBJECT_DISPATCH_LAYER_JOURNAL,
+ OBJECT_DISPATCH_LAYER_CORE,
+ OBJECT_DISPATCH_LAYER_LAST
+diff --git a/src/os/CacheStore/SyncFile.cc b/src/os/CacheStore/SyncFile.cc
+new file mode 100644
+index 0000000..5352bde
+--- /dev/null
++++ b/src/os/CacheStore/SyncFile.cc
+@@ -0,0 +1,110 @@
++// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
++// vim: ts=8 sw=2 smarttab
++
++#include "os/CacheStore/SyncFile.h"
++#include "include/Context.h"
++#include "common/dout.h"
++#include "common/WorkQueue.h"
++#include "librbd/ImageCtx.h"
++#include <sys/types.h>
++#include <sys/stat.h>
++#include <aio.h>
++#include <errno.h>
++#include <fcntl.h>
++#include <utility>
++
++#define dout_subsys ceph_subsys_rbd
++#undef dout_prefix
++#define dout_prefix *_dout << "librbd::file::SyncFile: " << this << " " \
++ << __func__ << ": "
++
++namespace os {
++namespace CacheStore {
++
++SyncFile::SyncFile(CephContext *cct, const std::string &name)
++ : cct(cct)
++{
++ m_name = cct->_conf->get_val<std::string>("rbd_shared_cache_path") + "/rbd_cache." + name;
++ ldout(cct, 20) << "file path=" << m_name << dendl;
++}
++
++SyncFile::~SyncFile()
++{
++ // TODO force proper cleanup
++ if (m_fd != -1) {
++ ::close(m_fd);
++ }
++}
++
++void SyncFile::open(Context *on_finish)
++{
++ while (true) {
++ m_fd = ::open(m_name.c_str(), O_CREAT | O_DIRECT | O_NOATIME | O_RDWR | O_SYNC,
++ S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP);
++ if (m_fd == -1) {
++ int r = -errno;
++ if (r == -EINTR) {
++ continue;
++ }
++ on_finish->complete(r);
++ return;
++ }
++ break;
++ }
++
++ on_finish->complete(0);
++}
++
++void SyncFile::open()
++{
++ while (true)
++ {
++ m_fd = ::open(m_name.c_str(), O_CREAT | O_NOATIME | O_RDWR | O_SYNC,
++ S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP);
++ if (m_fd == -1)
++ {
++ int r = -errno;
++ if (r == -EINTR) {
++ continue;
++ }
++ return;
++ }
++ break;
++ }
++}
++
++int SyncFile::write_object_to_file(ceph::bufferlist read_buf, uint64_t object_len) {
++
++ ldout(cct, 20) << "cache file name:" << m_name
++ << ", length:" << object_len << dendl;
++
++ // TODO(): aio
++ int ret = pwrite(m_fd, read_buf.c_str(), object_len, 0);
++ if(ret < 0) {
++ lderr(cct)<<"write file fail:" << std::strerror(errno) << dendl;
++ return ret;
++ }
++
++ return ret;
++}
++
++int SyncFile::read_object_from_file(ceph::bufferlist* read_buf, uint64_t object_off, uint64_t object_len) {
++
++ ldout(cct, 20) << "offset:" << object_off
++ << ", length:" << object_len << dendl;
++
++ bufferptr buf(object_len);
++
++ // TODO(): aio
++ int ret = pread(m_fd, buf.c_str(), object_len, object_off);
++ if(ret < 0) {
++ lderr(cct)<<"read file fail:" << std::strerror(errno) << dendl;
++ return ret;
++ }
++ read_buf->append(std::move(buf));
++
++ return ret;
++}
++
++} // namespace CacheStore
++} // namespace os
+diff --git a/src/os/CacheStore/SyncFile.h b/src/os/CacheStore/SyncFile.h
+new file mode 100644
+index 0000000..81602ce
+--- /dev/null
++++ b/src/os/CacheStore/SyncFile.h
+@@ -0,0 +1,74 @@
++// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
++// vim: ts=8 sw=2 smarttab
++
++#ifndef CEPH_LIBOS_CACHE_STORE_SYNC_FILE
++#define CEPH_LIBOS_CACHE_STORE_SYNC_FILE
++
++#include "include/buffer_fwd.h"
++#include <sys/mman.h>
++#include <string>
++
++struct Context;
++struct ContextWQ;
++class CephContext;
++
++namespace os {
++
++namespace CacheStore {
++
++class SyncFile {
++public:
++ SyncFile(CephContext *cct, const std::string &name);
++ ~SyncFile();
++
++ // TODO use IO queue instead of individual commands so operations can be
++ // submitted in batch
++
++ // TODO use scatter/gather API
++
++ void open(Context *on_finish);
++
++ // ##
++ void open();
++ bool try_open();
++ void close(Context *on_finish);
++ void remove(Context *on_finish);
++
++ void read(uint64_t offset, uint64_t length, ceph::bufferlist *bl, Context *on_finish);
++
++ void write(uint64_t offset, ceph::bufferlist &&bl, bool fdatasync, Context *on_finish);
++
++ void discard(uint64_t offset, uint64_t length, bool fdatasync, Context *on_finish);
++
++ void truncate(uint64_t length, bool fdatasync, Context *on_finish);
++
++ void fsync(Context *on_finish);
++
++ void fdatasync(Context *on_finish);
++
++ uint64_t filesize();
++
++ int load(void** dest, uint64_t filesize);
++
++ int remove();
++
++ // ##
++ int write_object_to_file(ceph::bufferlist read_buf, uint64_t object_len);
++ int read_object_from_file(ceph::bufferlist* read_buf, uint64_t object_off, uint64_t object_len);
++
++private:
++ CephContext *cct;
++ std::string m_name;
++ int m_fd = -1;
++
++ int write(uint64_t offset, const ceph::bufferlist &bl, bool fdatasync);
++ int read(uint64_t offset, uint64_t length, ceph::bufferlist *bl);
++ int discard(uint64_t offset, uint64_t length, bool fdatasync);
++ int truncate(uint64_t length, bool fdatasync);
++ int fdatasync();
++};
++
++} // namespace CacheStore
++} // namespace os
++
++#endif // CEPH_LIBOS_CACHE_STORE_SYNC_FILE
+diff --git a/src/test/librbd/test_mirroring.cc b/src/test/librbd/test_mirroring.cc
+index b4fdeae..d7d1aa6 100644
+--- a/src/test/librbd/test_mirroring.cc
++++ b/src/test/librbd/test_mirroring.cc
+@@ -47,6 +47,7 @@ public:
+
+ void SetUp() override {
+ ASSERT_EQ(0, _rados.ioctx_create(_pool_name.c_str(), m_ioctx));
++ ASSERT_EQ(0, _rados.conf_set("rbd_shared_cache_enabled", "false"));
+ }
+
+ std::string image_name = "mirrorimg1";
+diff --git a/src/test/rbd_mirror/test_ImageReplayer.cc b/src/test/rbd_mirror/test_ImageReplayer.cc
+index 8a95a65..b5598bd 100644
+--- a/src/test/rbd_mirror/test_ImageReplayer.cc
++++ b/src/test/rbd_mirror/test_ImageReplayer.cc
+@@ -90,6 +90,7 @@ public:
+ EXPECT_EQ("", connect_cluster_pp(*m_local_cluster.get()));
+ EXPECT_EQ(0, m_local_cluster->conf_set("rbd_cache", "false"));
+ EXPECT_EQ(0, m_local_cluster->conf_set("rbd_mirror_journal_poll_age", "1"));
++ EXPECT_EQ(0, m_local_cluster->conf_set("rbd_shared_cache_enabled", "false"));
+
+ m_local_pool_name = get_temp_pool_name();
+ EXPECT_EQ(0, m_local_cluster->pool_create(m_local_pool_name.c_str()));
+@@ -99,6 +100,7 @@ public:
+
+ EXPECT_EQ("", connect_cluster_pp(m_remote_cluster));
+ EXPECT_EQ(0, m_remote_cluster.conf_set("rbd_cache", "false"));
++ EXPECT_EQ(0, m_remote_cluster.conf_set("rbd_shared_cache_enabled", "false"));
+
+ m_remote_pool_name = get_temp_pool_name();
+ EXPECT_EQ(0, m_remote_cluster.pool_create(m_remote_pool_name.c_str()));
+diff --git a/src/test/rbd_mirror/test_fixture.cc b/src/test/rbd_mirror/test_fixture.cc
+index b2a51ca..9e77098 100644
+--- a/src/test/rbd_mirror/test_fixture.cc
++++ b/src/test/rbd_mirror/test_fixture.cc
+@@ -27,6 +27,7 @@ void TestFixture::SetUpTestCase() {
+ _rados = std::shared_ptr<librados::Rados>(new librados::Rados());
+ ASSERT_EQ("", connect_cluster_pp(*_rados.get()));
+ ASSERT_EQ(0, _rados->conf_set("rbd_cache", "false"));
++ ASSERT_EQ(0, _rados->conf_set("rbd_shared_cache_enabled", "false"));
+
+ _local_pool_name = get_temp_pool_name("test-rbd-mirror-");
+ ASSERT_EQ(0, _rados->pool_create(_local_pool_name.c_str()));
+diff --git a/src/tools/CMakeLists.txt b/src/tools/CMakeLists.txt
+index 3789e3c..72ab342 100644
+--- a/src/tools/CMakeLists.txt
++++ b/src/tools/CMakeLists.txt
+@@ -99,6 +99,7 @@ endif(WITH_CEPHFS)
+ if(WITH_RBD)
+ add_subdirectory(rbd)
+ add_subdirectory(rbd_mirror)
++ add_subdirectory(rbd_cache)
+ if(LINUX)
+ add_subdirectory(rbd_nbd)
+ endif()
+diff --git a/src/tools/rbd_cache/CMakeLists.txt b/src/tools/rbd_cache/CMakeLists.txt
+new file mode 100644
+index 0000000..08eae60
+--- /dev/null
++++ b/src/tools/rbd_cache/CMakeLists.txt
+@@ -0,0 +1,9 @@
++add_executable(rbd-cache
++ ${CMAKE_SOURCE_DIR}/src/os/CacheStore/SyncFile.cc
++ ObjectCacheStore.cc
++ CacheController.cc
++ main.cc)
++target_link_libraries(rbd-cache
++ librados
++ global)
++install(TARGETS rbd-cache DESTINATION bin)
+diff --git a/src/tools/rbd_cache/CacheController.cc b/src/tools/rbd_cache/CacheController.cc
+new file mode 100644
+index 0000000..c914358
+--- /dev/null
++++ b/src/tools/rbd_cache/CacheController.cc
+@@ -0,0 +1,105 @@
++// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
++// vim: ts=8 sw=2 smarttab
++
++#include "CacheController.hpp"
++
++#define dout_context g_ceph_context
++#define dout_subsys ceph_subsys_rbd_cache
++#undef dout_prefix
++#define dout_prefix *_dout << "rbd::cache::CacheController: " << this << " " \
++ << __func__ << ": "
++
++
++class ThreadPoolSingleton : public ThreadPool {
++public:
++ ContextWQ *op_work_queue;
++
++ explicit ThreadPoolSingleton(CephContext *cct)
++ : ThreadPool(cct, "librbd::cache::thread_pool", "tp_librbd_cache", 32,
++ "pcache_threads"),
++ op_work_queue(new ContextWQ("librbd::pcache_op_work_queue",
++ cct->_conf->get_val<int64_t>("rbd_op_thread_timeout"),
++ this)) {
++ start();
++ }
++ ~ThreadPoolSingleton() override {
++ op_work_queue->drain();
++ delete op_work_queue;
++
++ stop();
++ }
++};
++
++
++CacheController::CacheController(CephContext *cct, const std::vector<const char*> &args):
++ m_args(args), m_cct(cct) {
++
++}
++
++CacheController::~CacheController() {
++
++}
++
++int CacheController::init() {
++ ThreadPoolSingleton* thread_pool_singleton = &m_cct->lookup_or_create_singleton_object<ThreadPoolSingleton>(
++ "rbd::cache::thread_pool", false, m_cct);
++ pcache_op_work_queue = thread_pool_singleton->op_work_queue;
++
++ m_object_cache_store = new ObjectCacheStore(m_cct, pcache_op_work_queue);
++ int r = m_object_cache_store->init(false);
++ if (r < 0) {
++ //derr << "init error\n" << dendl;
++ }
++ return r;
++}
++
++int CacheController::shutdown() {
++ int r = m_object_cache_store->shutdown();
++ return r;
++}
++
++void CacheController::handle_signal(int signum){}
++
++void CacheController::run() {
++ try {
++ //TODO(): use new socket path
++ std::string controller_path = "/tmp/rbd_shared_readonly_cache_demo";
++ std::remove(controller_path.c_str());
++
++ m_cache_server = new CacheServer(io_service, controller_path,
++ ([&](uint64_t p, std::string s){handle_request(p, s);}));
++ io_service.run();
++ } catch (std::exception& e) {
++ std::cerr << "Exception: " << e.what() << "\n";
++ }
++}
++
++void CacheController::handle_request(uint64_t sesstion_id, std::string msg){
++ rbdsc_req_type_t *io_ctx = (rbdsc_req_type_t*)(msg.c_str());
++
++ int ret = 0;
++
++ switch (io_ctx->type) {
++ case RBDSC_REGISTER: {
++ // init cache layout for volume
++ m_object_cache_store->init_cache(io_ctx->vol_name, io_ctx->vol_size);
++ io_ctx->type = RBDSC_REGISTER_REPLY;
++ m_cache_server->send(sesstion_id, std::string((char*)io_ctx, msg.size()));
++
++ break;
++ }
++ case RBDSC_READ: {
++ // lookup object in local cache store
++ ret = m_object_cache_store->lookup_object(io_ctx->pool_name, io_ctx->vol_name);
++ if (ret < 0) {
++ io_ctx->type = RBDSC_READ_RADOS;
++ } else {
++ io_ctx->type = RBDSC_READ_REPLY;
++ }
++ m_cache_server->send(sesstion_id, std::string((char*)io_ctx, msg.size()));
++
++ break;
++ }
++
++ }
++}
+diff --git a/src/tools/rbd_cache/CacheController.hpp b/src/tools/rbd_cache/CacheController.hpp
+new file mode 100644
+index 0000000..97113e4
+--- /dev/null
++++ b/src/tools/rbd_cache/CacheController.hpp
+@@ -0,0 +1,49 @@
++#ifndef CACHE_CONTROLLER_H
++#define CACHE_CONTROLLER_H
++
++#include <thread>
++
++#include "common/Formatter.h"
++#include "common/admin_socket.h"
++#include "common/debug.h"
++#include "common/errno.h"
++#include "common/ceph_context.h"
++#include "common/Mutex.h"
++#include "common/WorkQueue.h"
++#include "include/rados/librados.hpp"
++#include "include/rbd/librbd.h"
++#include "include/assert.h"
++#include "librbd/ImageCtx.h"
++#include "librbd/ImageState.h"
++
++#include "CacheControllerSocket.hpp"
++#include "ObjectCacheStore.h"
++
++
++using boost::asio::local::stream_protocol;
++
++class CacheController {
++ public:
++ CacheController(CephContext *cct, const std::vector<const char*> &args);
++ ~CacheController();
++
++ int init();
++
++ int shutdown();
++
++ void handle_signal(int sinnum);
++
++ void run();
++
++ void handle_request(uint64_t sesstion_id, std::string msg);
++
++ private:
++ boost::asio::io_service io_service;
++ CacheServer *m_cache_server;
++ std::vector<const char*> m_args;
++ CephContext *m_cct;
++ ObjectCacheStore *m_object_cache_store;
++ ContextWQ* pcache_op_work_queue;
++};
++
++#endif
+diff --git a/src/tools/rbd_cache/CacheControllerSocket.hpp b/src/tools/rbd_cache/CacheControllerSocket.hpp
+new file mode 100644
+index 0000000..6e1a743
+--- /dev/null
++++ b/src/tools/rbd_cache/CacheControllerSocket.hpp
+@@ -0,0 +1,125 @@
++// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
++// vim: ts=8 sw=2 smarttab
++
++#ifndef CACHE_CONTROLLER_SOCKET_H
++#define CACHE_CONTROLLER_SOCKET_H
++
++#include <cstdio>
++#include <iostream>
++#include <array>
++#include <memory>
++#include <string>
++#include <boost/bind.hpp>
++#include <boost/asio.hpp>
++#include <boost/algorithm/string.hpp>
++#include "CacheControllerSocketCommon.h"
++
++
++using boost::asio::local::stream_protocol;
++
++class session : public std::enable_shared_from_this<session> {
++public:
++ session(uint64_t session_id, boost::asio::io_service& io_service, ProcessMsg processmsg)
++ : session_id(session_id), socket_(io_service), process_msg(processmsg) {}
++
++ stream_protocol::socket& socket() {
++ return socket_;
++ }
++
++ void start() {
++
++ boost::asio::async_read(socket_, boost::asio::buffer(data_),
++ boost::asio::transfer_exactly(544),
++ boost::bind(&session::handle_read,
++ shared_from_this(),
++ boost::asio::placeholders::error,
++ boost::asio::placeholders::bytes_transferred));
++
++ }
++
++ void handle_read(const boost::system::error_code& error, size_t bytes_transferred) {
++
++ if (!error) {
++
++ process_msg(session_id, std::string(data_, bytes_transferred));
++
++ }
++ }
++
++ void handle_write(const boost::system::error_code& error) {
++ if (!error) {
++ socket_.async_read_some(boost::asio::buffer(data_),
++ boost::bind(&session::handle_read,
++ shared_from_this(),
++ boost::asio::placeholders::error,
++ boost::asio::placeholders::bytes_transferred));
++ }
++ }
++
++ void send(std::string msg) {
++
++ boost::asio::async_write(socket_,
++ boost::asio::buffer(msg.c_str(), msg.size()),
++ boost::bind(&session::handle_write,
++ shared_from_this(),
++ boost::asio::placeholders::error));
++
++ }
++
++private:
++ uint64_t session_id;
++ stream_protocol::socket socket_;
++ ProcessMsg process_msg;
++
++ // Buffer used to store data received from the client.
++ //std::array<char, 1024> data_;
++ char data_[1024];
++};
++
++typedef std::shared_ptr<session> session_ptr;
++
++class CacheServer {
++public:
++ CacheServer(boost::asio::io_service& io_service,
++ const std::string& file, ProcessMsg processmsg)
++ : io_service_(io_service),
++ server_process_msg(processmsg),
++ acceptor_(io_service, stream_protocol::endpoint(file))
++ {
++ session_ptr new_session(new session(session_id, io_service_, server_process_msg));
++ acceptor_.async_accept(new_session->socket(),
++ boost::bind(&CacheServer::handle_accept, this, new_session,
++ boost::asio::placeholders::error));
++ }
++
++ void handle_accept(session_ptr new_session,
++ const boost::system::error_code& error)
++ {
++ //TODO(): open librbd snap
++ if (!error) {
++ new_session->start();
++ session_map.emplace(session_id, new_session);
++ session_id++;
++ new_session.reset(new session(session_id, io_service_, server_process_msg));
++ acceptor_.async_accept(new_session->socket(),
++ boost::bind(&CacheServer::handle_accept, this, new_session,
++ boost::asio::placeholders::error));
++ }
++ }
++
++ void send(uint64_t session_id, std::string msg) {
++ auto it = session_map.find(session_id);
++ if (it != session_map.end()) {
++ it->second->send(msg);
++ }
++ }
++
++private:
++ boost::asio::io_service& io_service_;
++ ProcessMsg server_process_msg;
++ stream_protocol::acceptor acceptor_;
++ uint64_t session_id = 1;
++ std::map<uint64_t, session_ptr> session_map;
++};
++
++#endif
+diff --git a/src/tools/rbd_cache/CacheControllerSocketClient.hpp b/src/tools/rbd_cache/CacheControllerSocketClient.hpp
+new file mode 100644
+index 0000000..8e61aa9
+--- /dev/null
++++ b/src/tools/rbd_cache/CacheControllerSocketClient.hpp
+@@ -0,0 +1,131 @@
++// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
++// vim: ts=8 sw=2 smarttab
++
++#ifndef CACHE_CONTROLLER_SOCKET_CLIENT_H
++#define CACHE_CONTROLLER_SOCKET_CLIENT_H
++
++#include <boost/asio.hpp>
++#include <boost/bind.hpp>
++#include <boost/algorithm/string.hpp>
++#include "include/assert.h"
++#include "CacheControllerSocketCommon.h"
++
++
++using boost::asio::local::stream_protocol;
++
++class CacheClient {
++public:
++ CacheClient(boost::asio::io_service& io_service,
++ const std::string& file, ClientProcessMsg processmsg)
++ : io_service_(io_service),
++ io_service_work_(io_service),
++ socket_(io_service),
++ m_client_process_msg(processmsg),
++ ep_(stream_protocol::endpoint(file))
++ {
++ std::thread thd([this](){io_service_.run(); });
++ thd.detach();
++ }
++
++ void run(){
++ }
++
++ int connect() {
++ try {
++ socket_.connect(ep_);
++ } catch (std::exception& e) {
++ return -1;
++ }
++ connected = true;
++ return 0;
++ }
++
++ int register_volume(std::string pool_name, std::string vol_name, uint64_t vol_size) {
++ // cache controller will init layout
++ rbdsc_req_type_t *message = new rbdsc_req_type_t();
++ message->type = RBDSC_REGISTER;
++ memcpy(message->pool_name, pool_name.c_str(), pool_name.size());
++ memcpy(message->vol_name, vol_name.c_str(), vol_name.size());
++ message->vol_size = vol_size;
++ message->offset = 0;
++ message->length = 0;
++ boost::asio::async_write(socket_, boost::asio::buffer((char*)message, message->size()),
++ [this](const boost::system::error_code& err, size_t cb) {
++ if (!err) {
++ boost::asio::async_read(socket_, boost::asio::buffer(buffer_),
++ boost::asio::transfer_exactly(544),
++ [this](const boost::system::error_code& err, size_t cb) {
++ if (!err) {
++ m_client_process_msg(std::string(buffer_, cb));
++ } else {
++ return -1;
++ }
++ });
++ } else {
++ return -1;
++ }
++ });
++
++ return 0;
++ }
++
++ int lookup_object(std::string pool_name, std::string vol_name, std::string object_id, bool* result) {
++ rbdsc_req_type_t *message = new rbdsc_req_type_t();
++ message->type = RBDSC_READ;
++ memcpy(message->pool_name, pool_name.c_str(), pool_name.size());
++ memcpy(message->vol_name, object_id.c_str(), object_id.size());
++ message->vol_size = 0;
++ message->offset = 0;
++ message->length = 0;
++
++ boost::asio::async_write(socket_, boost::asio::buffer((char*)message, message->size()),
++ [this, result](const boost::system::error_code& err, size_t cb) {
++ if (!err) {
++ get_result(result);
++ } else {
++ return -1;
++ }
++ });
++ std::unique_lock<std::mutex> lk(m);
++ cv.wait(lk);
++ return 0;
++ }
++
++ void get_result(bool* result) {
++ boost::asio::async_read(socket_, boost::asio::buffer(buffer_),
++ boost::asio::transfer_exactly(544),
++ [this, result](const boost::system::error_code& err, size_t cb) {
++ if (!err) {
++ *result = true;
++ cv.notify_one();
++ m_client_process_msg(std::string(buffer_, cb));
++ } else {
++ return -1;
++ }
++ });
++ }
++
++ void handle_connect(const boost::system::error_code& error) {
++ //TODO(): open librbd snap
++ }
++
++ void handle_write(const boost::system::error_code& error) {
++ }
++
++private:
++ boost::asio::io_service& io_service_;
++ boost::asio::io_service::work io_service_work_;
++ stream_protocol::socket socket_;
++ ClientProcessMsg m_client_process_msg;
++ stream_protocol::endpoint ep_;
++ char buffer_[1024];
++ int block_size_ = 1024;
++
++ std::condition_variable cv;
++ std::mutex m;
++
++public:
++ bool connected = false;
++};
++
++#endif
+diff --git a/src/tools/rbd_cache/CacheControllerSocketCommon.h b/src/tools/rbd_cache/CacheControllerSocketCommon.h
+new file mode 100644
+index 0000000..e253bb1
+--- /dev/null
++++ b/src/tools/rbd_cache/CacheControllerSocketCommon.h
+@@ -0,0 +1,43 @@
++// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
++// vim: ts=8 sw=2 smarttab
++
++#ifndef CACHE_CONTROLLER_SOCKET_COMMON_H
++#define CACHE_CONTROLLER_SOCKET_COMMON_H
++
++#define RBDSC_REGISTER 0X11
++#define RBDSC_READ 0X12
++#define RBDSC_LOOKUP 0X13
++#define RBDSC_REGISTER_REPLY 0X14
++#define RBDSC_READ_REPLY 0X15
++#define RBDSC_LOOKUP_REPLY 0X16
++#define RBDSC_READ_RADOS 0X16
++
++typedef std::function<void(uint64_t, std::string)> ProcessMsg;
++typedef std::function<void(std::string)> ClientProcessMsg;
++typedef uint8_t rbdsc_req_type;
++struct rbdsc_req_type_t {
++ rbdsc_req_type type;
++ uint64_t vol_size;
++ uint64_t offset;
++ uint64_t length;
++ char pool_name[256];
++ char vol_name[256];
++
++ uint64_t size() {
++ return sizeof(rbdsc_req_type_t);
++ }
++
++ std::string to_buffer() {
++ std::stringstream ss;
++ ss << type;
++ ss << vol_size;
++ ss << offset;
++ ss << length;
++ ss << pool_name;
++ ss << vol_name;
++
++ return ss.str();
++ }
++};
++
++#endif
+diff --git a/src/tools/rbd_cache/ObjectCacheStore.cc b/src/tools/rbd_cache/ObjectCacheStore.cc
+new file mode 100644
+index 0000000..90b407c
+--- /dev/null
++++ b/src/tools/rbd_cache/ObjectCacheStore.cc
+@@ -0,0 +1,147 @@
++// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
++// vim: ts=8 sw=2 smarttab
++
++#include "ObjectCacheStore.h"
++
++#define dout_context g_ceph_context
++#define dout_subsys ceph_subsys_rbd_cache
++#undef dout_prefix
++#define dout_prefix *_dout << "rbd::cache::ObjectCacheStore: " << this << " " \
++ << __func__ << ": "
++
++
++ObjectCacheStore::ObjectCacheStore(CephContext *cct, ContextWQ* work_queue)
++ : m_cct(cct), m_work_queue(work_queue),
++ m_cache_table_lock("rbd::cache::ObjectCacheStore"),
++ m_rados(new librados::Rados()) {
++}
++
++ObjectCacheStore::~ObjectCacheStore() {
++
++}
++
++int ObjectCacheStore::init(bool reset) {
++
++ int ret = m_rados->init_with_context(m_cct);
++ if(ret < 0) {
++ lderr(m_cct) << "fail to init Ceph context" << dendl;
++ return ret;
++ }
++
++ ret = m_rados->connect();
++ if(ret < 0 ) {
++ lderr(m_cct) << "fail to conect to cluster" << dendl;
++ return ret;
++ }
++ //TODO(): check existing cache objects
++ return ret;
++}
++
++int ObjectCacheStore::do_promote(std::string pool_name, std::string object_name) {
++ int ret = 0;
++ std::string cache_file_name = pool_name + object_name;
++
++ if (m_ioctxs.find(pool_name) == m_ioctxs.end()) {
++ librados::IoCtx* io_ctx = new librados::IoCtx();
++ ret = m_rados->ioctx_create(pool_name.c_str(), *io_ctx);
++ if (ret < 0) {
++ lderr(m_cct) << "fail to create ioctx" << dendl;
++ assert(0);
++ }
++ m_ioctxs.emplace(pool_name, io_ctx);
++ }
++
++ assert(m_ioctxs.find(pool_name) != m_ioctxs.end());
++
++ librados::IoCtx* ioctx = m_ioctxs[pool_name];
++
++ //promoting: update metadata
++ {
++ Mutex::Locker locker(m_cache_table_lock);
++ m_cache_table.emplace(cache_file_name, PROMOTING);
++ }
++
++ librados::bufferlist read_buf;
++ int object_size = 4096*1024; //TODO(): read config from image metadata
++
++ //TODO(): async promote
++ ret = promote_object(ioctx, object_name, read_buf, object_size);
++ if (ret == -ENOENT) {
++ read_buf.append(std::string(object_size, '0'));
++ ret = 0;
++ }
++
++ if( ret < 0) {
++ lderr(m_cct) << "fail to read from rados" << dendl;
++ return ret;
++ }
++
++ // persistent to cache
++ os::CacheStore::SyncFile cache_file(m_cct, cache_file_name);
++ cache_file.open();
++ ret = cache_file.write_object_to_file(read_buf, object_size);
++
++ assert(m_cache_table.find(cache_file_name) != m_cache_table.end());
++
++ // update metadata
++ {
++ Mutex::Locker locker(m_cache_table_lock);
++ m_cache_table.emplace(cache_file_name, PROMOTED);
++ }
++
++ return ret;
++
++}
++
++int ObjectCacheStore::lookup_object(std::string pool_name, std::string object_name) {
++
++ std::string cache_file_name = pool_name + object_name;
++ {
++ Mutex::Locker locker(m_cache_table_lock);
++
++ auto it = m_cache_table.find(cache_file_name);
++ if (it != m_cache_table.end()) {
++
++ if (it->second == PROMOTING) {
++ return -1;
++ } else if (it->second == PROMOTED) {
++ return 0;
++ } else {
++ assert(0);
++ }
++ }
++ }
++
++ int ret = do_promote(pool_name, object_name);
++
++ return ret;
++}
++
++int ObjectCacheStore::shutdown() {
++ m_rados->shutdown();
++ return 0;
++}
++
++int ObjectCacheStore::init_cache(std::string vol_name, uint64_t vol_size) {
++ return 0;
++}
++
++int ObjectCacheStore::lock_cache(std::string vol_name) {
++ return 0;
++}
++
++int ObjectCacheStore::promote_object(librados::IoCtx* ioctx, std::string object_name, librados::bufferlist read_buf, uint64_t read_len) {
++ int ret;
++
++ librados::AioCompletion* read_completion = librados::Rados::aio_create_completion();
++
++ ret = ioctx->aio_read(object_name, read_completion, &read_buf, read_len, 0);
++ if(ret < 0) {
++ lderr(m_cct) << "fail to read from rados" << dendl;
++ return ret;
++ }
++ read_completion->wait_for_complete();
++ ret = read_completion->get_return_value();
++ return ret;
++
++}
+diff --git a/src/tools/rbd_cache/ObjectCacheStore.h b/src/tools/rbd_cache/ObjectCacheStore.h
+new file mode 100644
+index 0000000..12f8399
+--- /dev/null
++++ b/src/tools/rbd_cache/ObjectCacheStore.h
+@@ -0,0 +1,65 @@
++// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
++// vim: ts=8 sw=2 smarttab
++
++#ifndef OBJECT_CACHE_STORE_H
++#define OBJECT_CACHE_STORE_H
++
++#include "common/debug.h"
++#include "common/errno.h"
++#include "common/ceph_context.h"
++#include "common/Mutex.h"
++#include "include/rados/librados.hpp"
++#include "include/rbd/librbd.h"
++#include "librbd/ImageCtx.h"
++#include "librbd/ImageState.h"
++#include "os/CacheStore/SyncFile.h"
++
++using librados::Rados;
++using librados::IoCtx;
++
++typedef shared_ptr<librados::Rados> RadosRef;
++typedef shared_ptr<librados::IoCtx> IoCtxRef;
++
++class ObjectCacheStore
++{
++ public:
++ ObjectCacheStore(CephContext *cct, ContextWQ* work_queue);
++ ~ObjectCacheStore();
++
++ int init(bool reset);
++
++ int shutdown();
++
++ int lookup_object(std::string pool_name, std::string object_name);
++
++ int init_cache(std::string vol_name, uint64_t vol_size);
++
++ int lock_cache(std::string vol_name);
++
++ private:
++ int _evict_object();
++
++ int do_promote(std::string pool_name, std::string object_name);
++
++ int promote_object(librados::IoCtx*, std::string object_name,
++ librados::bufferlist read_buf,
++ uint64_t length);
++
++ enum {
++ PROMOTING = 0,
++ PROMOTED,
++ };
++
++ CephContext *m_cct;
++ ContextWQ* m_work_queue;
++ Mutex m_cache_table_lock;
++ RadosRef m_rados;
++
++ std::map<std::string, uint8_t> m_cache_table;
++
++ std::map<std::string, librados::IoCtx*> m_ioctxs;
++
++ os::CacheStore::SyncFile *m_cache_file;
++};
++
++#endif
+diff --git a/src/tools/rbd_cache/main.cc b/src/tools/rbd_cache/main.cc
+new file mode 100644
+index 0000000..336a581
+--- /dev/null
++++ b/src/tools/rbd_cache/main.cc
+@@ -0,0 +1,85 @@
++// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
++// vim: ts=8 sw=2 smarttab
++
++#include "common/ceph_argparse.h"
++#include "common/config.h"
++#include "common/debug.h"
++#include "common/errno.h"
++#include "global/global_init.h"
++#include "global/signal_handler.h"
++#include "CacheController.hpp"
++
++#include <vector>
++
++CacheController *cachectl = nullptr;
++
++void usage() {
++ std::cout << "usage: cache controller [options...]" << std::endl;
++ std::cout << "options:\n";
++ std::cout << " -m monaddress[:port] connect to specified monitor\n";
++ std::cout << " --keyring=<path> path to keyring for local cluster\n";
++ std::cout << " --log-file=<logfile> file to log debug output\n";
++ std::cout << " --debug-rbd-cachecontroller=<log-level>/<memory-level> set rbd-mirror debug level\n";
++ generic_server_usage();
++}
++
++static void handle_signal(int signum)
++{
++ if (cachectl)
++ cachectl->handle_signal(signum);
++}
++
++int main(int argc, const char **argv)
++{
++ std::vector<const char*> args;
++ env_to_vec(args);
++ argv_to_vec(argc, argv, args);
++
++ auto cct = global_init(nullptr, args, CEPH_ENTITY_TYPE_CLIENT,
++ CODE_ENVIRONMENT_DAEMON,
++ CINIT_FLAG_UNPRIVILEGED_DAEMON_DEFAULTS);
++
++ for (auto i = args.begin(); i != args.end(); ++i) {
++ if (ceph_argparse_flag(args, i, "-h", "--help", (char*)NULL)) {
++ usage();
++ return EXIT_SUCCESS;
++ }
++ }
++
++ if (g_conf->daemonize) {
++ global_init_daemonize(g_ceph_context);
++ }
++ g_ceph_context->enable_perf_counter();
++
++ common_init_finish(g_ceph_context);
++
++ init_async_signal_handler();
++ register_async_signal_handler(SIGHUP, sighup_handler);
++ register_async_signal_handler_oneshot(SIGINT, handle_signal);
++ register_async_signal_handler_oneshot(SIGTERM, handle_signal);
++
++ std::vector<const char*> cmd_args;
++ argv_to_vec(argc, argv, cmd_args);
++
++ // disable unnecessary librbd cache
++ g_ceph_context->_conf->set_val_or_die("rbd_cache", "false");
++
++ cachectl = new CacheController(g_ceph_context, cmd_args);
++ int r = cachectl->init();
++ if (r < 0) {
++ std::cerr << "failed to initialize: " << cpp_strerror(r) << std::endl;
++ goto cleanup;
++ }
++
++ cachectl->run();
++
++ cleanup:
++ unregister_async_signal_handler(SIGHUP, sighup_handler);
++ unregister_async_signal_handler(SIGINT, handle_signal);
++ unregister_async_signal_handler(SIGTERM, handle_signal);
++ shutdown_async_signal_handler();
++
++ delete cachectl;
++
++ return r < 0 ? EXIT_SUCCESS : EXIT_FAILURE;
++}
+--
+2.7.4
+