summaryrefslogtreecommitdiffstats
path: root/src/ceph/0008-librbd-implement-async-cache-lookup-and-read.patch
blob: 16dd41fb9d5870d24999a347cc00353460b66570 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
From 9f8ff821dfc98dfc3cdb557b736ce455a3ae6162 Mon Sep 17 00:00:00 2001
From: Yuan Zhou <yuan.zhou@intel.com>
Date: Thu, 16 Aug 2018 17:28:46 +0800
Subject: [PATCH 08/10] librbd: implement async cache lookup and read

Signed-off-by: Yuan Zhou <yuan.zhou@intel.com>
---
 .../SharedPersistentObjectCacherObjectDispatch.cc  | 63 ++++++++++++----------
 .../SharedPersistentObjectCacherObjectDispatch.h   |  7 +++
 src/tools/rbd_cache/CacheController.cc             |  9 ++--
 src/tools/rbd_cache/CacheControllerSocket.hpp      |  8 ++-
 .../rbd_cache/CacheControllerSocketClient.hpp      | 49 +++++++++--------
 src/tools/rbd_cache/CacheControllerSocketCommon.h  | 12 +++++
 6 files changed, 94 insertions(+), 54 deletions(-)

diff --git a/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc b/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc
index 2aa5cad..407ce49 100644
--- a/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc
+++ b/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.cc
@@ -29,13 +29,8 @@ SharedPersistentObjectCacherObjectDispatch<I>::SharedPersistentObjectCacherObjec
 
 template <typename I>
 SharedPersistentObjectCacherObjectDispatch<I>::~SharedPersistentObjectCacherObjectDispatch() {
-  if (m_object_store) {
     delete m_object_store;
-  }
-
-  if (m_cache_client) {
     delete m_cache_client;
-  }
 }
 
 template <typename I>
@@ -88,34 +83,48 @@ bool SharedPersistentObjectCacherObjectDispatch<I>::read(
   ldout(cct, 20) << "object_no=" << object_no << " " << object_off << "~"
                  << object_len << dendl;
 
-  // ensure we aren't holding the cache lock post-read
   on_dispatched = util::create_async_context_callback(*m_image_ctx,
                                                       on_dispatched);
+  auto ctx = new FunctionContext([this, oid, object_off, object_len, read_data, dispatch_result, on_dispatched](bool cache) {
+    handle_read_cache(cache, oid, object_off, object_len, read_data, dispatch_result, on_dispatched);
+  });
 
   if (m_cache_client && m_cache_client->connected && m_object_store) {
-    bool exists;
     m_cache_client->lookup_object(m_image_ctx->data_ctx.get_pool_name(),
-      m_image_ctx->id, oid, &exists);
-
-    // try to read from parent image
-    ldout(cct, 20) << "SRO cache object exists:" << exists << dendl;
-    if (exists) {
-      int r = m_object_store->read_object(oid, read_data, object_off, object_len, on_dispatched);
-      if (r != 0) {
-        *dispatch_result = io::DISPATCH_RESULT_COMPLETE;
-	on_dispatched->complete(r);
-        return true;
-      }
-    }
+      m_image_ctx->id, oid, ctx);
   }
-
-  ldout(cct, 20) << "Continue read from RADOS" << dendl;
-  *dispatch_result = io::DISPATCH_RESULT_CONTINUE;
-  on_dispatched->complete(0);
   return true;
 }
 
 template <typename I>
+int SharedPersistentObjectCacherObjectDispatch<I>::handle_read_cache(
+    bool cache,
+    const std::string &oid, uint64_t object_off, uint64_t object_len,
+    ceph::bufferlist* read_data, io::DispatchResult* dispatch_result,
+    Context* on_dispatched) {
+  // IO chained in reverse order
+  auto cct = m_image_ctx->cct;
+  ldout(cct, 20) << dendl;
+
+  // try to read from parent image
+  if (cache) {
+    int r = m_object_store->read_object(oid, read_data, object_off, object_len, on_dispatched);
+    if (r != 0) {
+      *dispatch_result = io::DISPATCH_RESULT_COMPLETE;
+      //TODO(): complete in syncfile
+      on_dispatched->complete(r);
+      ldout(cct, 20) << "AAAAcomplete=" << *dispatch_result <<dendl;
+      return true;
+    }
+  } else {
+    *dispatch_result = io::DISPATCH_RESULT_CONTINUE;
+    on_dispatched->complete(0);
+    ldout(cct, 20) << "BBB no cache" << *dispatch_result <<dendl;
+    return false;
+  }
+}
+
+template <typename I>
 void SharedPersistentObjectCacherObjectDispatch<I>::client_handle_request(std::string msg) {
   auto cct = m_image_ctx->cct;
   ldout(cct, 20) << dendl;
@@ -123,26 +132,26 @@ void SharedPersistentObjectCacherObjectDispatch<I>::client_handle_request(std::s
   rbd::cache::rbdsc_req_type_t *io_ctx = (rbd::cache::rbdsc_req_type_t*)(msg.c_str());
 
   switch (io_ctx->type) {
-    case RBDSC_REGISTER_REPLY: {
+    case rbd::cache::RBDSC_REGISTER_REPLY: {
       // open cache handler for volume        
       ldout(cct, 20) << "SRO cache client open cache handler" << dendl;
       m_object_store = new SharedPersistentObjectCacher<I>(m_image_ctx, m_image_ctx->shared_cache_path);
 
       break;
     }
-    case RBDSC_READ_REPLY: {
+    case rbd::cache::RBDSC_READ_REPLY: {
       ldout(cct, 20) << "SRO cache client start to read cache" << dendl;
       //TODO(): should call read here
 
       break;
     }
-    case RBDSC_READ_RADOS: {
+    case rbd::cache::RBDSC_READ_RADOS: {
       ldout(cct, 20) << "SRO cache client start to read rados" << dendl;
       //TODO(): should call read here
 
       break;
     }
-    default: ldout(cct, 20) << "nothing" << dendl;
+    default: ldout(cct, 20) << "nothing" << io_ctx->type <<dendl;
       break;
     
   }
diff --git a/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.h b/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.h
index 200688f..36b868a 100644
--- a/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.h
+++ b/src/librbd/cache/SharedPersistentObjectCacherObjectDispatch.h
@@ -112,6 +112,13 @@ public:
 
 private:
 
+  int handle_read_cache(
+      bool cache,
+      const std::string &oid, uint64_t object_off,
+      uint64_t object_len, ceph::bufferlist* read_data,
+      io::DispatchResult* dispatch_result,
+      Context* on_dispatched);
+
   ImageCtxT* m_image_ctx;
 
   void client_handle_request(std::string msg);
diff --git a/src/tools/rbd_cache/CacheController.cc b/src/tools/rbd_cache/CacheController.cc
index cefcf28..c9d674b 100644
--- a/src/tools/rbd_cache/CacheController.cc
+++ b/src/tools/rbd_cache/CacheController.cc
@@ -76,7 +76,7 @@ void CacheController::run() {
   }
 }
 
-void CacheController::handle_request(uint64_t sesstion_id, std::string msg){
+void CacheController::handle_request(uint64_t session_id, std::string msg){
   rbdsc_req_type_t *io_ctx = (rbdsc_req_type_t*)(msg.c_str());
 
   int ret = 0;
@@ -86,7 +86,7 @@ void CacheController::handle_request(uint64_t sesstion_id, std::string msg){
       // init cache layout for volume        
       m_object_cache_store->init_cache(io_ctx->vol_name, io_ctx->vol_size);
       io_ctx->type = RBDSC_REGISTER_REPLY;
-      m_cache_server->send(sesstion_id, std::string((char*)io_ctx, msg.size()));
+      m_cache_server->send(session_id, std::string((char*)io_ctx, msg.size()));
 
       break;
     }
@@ -98,7 +98,10 @@ void CacheController::handle_request(uint64_t sesstion_id, std::string msg){
       } else {
         io_ctx->type = RBDSC_READ_REPLY;
       }
-      m_cache_server->send(sesstion_id, std::string((char*)io_ctx, msg.size()));
+      if (io_ctx->type != RBDSC_READ_REPLY) {
+        assert(0);
+      }
+      m_cache_server->send(session_id, std::string((char*)io_ctx, msg.size()));
 
       break;
     }
diff --git a/src/tools/rbd_cache/CacheControllerSocket.hpp b/src/tools/rbd_cache/CacheControllerSocket.hpp
index 967af1d..d178b58 100644
--- a/src/tools/rbd_cache/CacheControllerSocket.hpp
+++ b/src/tools/rbd_cache/CacheControllerSocket.hpp
@@ -43,7 +43,9 @@ public:
   void handle_read(const boost::system::error_code& error, size_t bytes_transferred) {
 
     if (!error) {
-     
+      if(bytes_transferred != 544){
+	assert(0);
+      }
       process_msg(session_id, std::string(data_, bytes_transferred));
 
     }
@@ -51,7 +53,8 @@ public:
 
   void handle_write(const boost::system::error_code& error) {
     if (!error) {
-      socket_.async_read_some(boost::asio::buffer(data_),
+    boost::asio::async_read(socket_, boost::asio::buffer(data_),
+                            boost::asio::transfer_exactly(544),
           boost::bind(&session::handle_read,
             shared_from_this(),
             boost::asio::placeholders::error,
@@ -63,6 +66,7 @@ public:
 
       boost::asio::async_write(socket_,
           boost::asio::buffer(msg.c_str(), msg.size()),
+          boost::asio::transfer_exactly(544),
           boost::bind(&session::handle_write,
             shared_from_this(),
             boost::asio::placeholders::error));
diff --git a/src/tools/rbd_cache/CacheControllerSocketClient.hpp b/src/tools/rbd_cache/CacheControllerSocketClient.hpp
index 56b79ce..3b0ca00 100644
--- a/src/tools/rbd_cache/CacheControllerSocketClient.hpp
+++ b/src/tools/rbd_cache/CacheControllerSocketClient.hpp
@@ -8,6 +8,7 @@
 #include <boost/bind.hpp>
 #include <boost/algorithm/string.hpp>
 #include "include/assert.h"
+#include "include/Context.h"
 #include "CacheControllerSocketCommon.h"
 
 
@@ -26,8 +27,12 @@ public:
       m_client_process_msg(processmsg),
       ep_(stream_protocol::endpoint(file))
   {
-     std::thread thd([this](){io_service_.run(); });
-     thd.detach();
+     io_thread.reset(new std::thread([this](){io_service_.run(); }));
+  }
+
+  ~CacheClient() {
+    io_service_.stop();
+    io_thread->join();
   }
 
   void run(){
@@ -53,7 +58,8 @@ public:
     message->offset = 0;
     message->length = 0;
     boost::asio::async_write(socket_,  boost::asio::buffer((char*)message, message->size()),
-        [this](const boost::system::error_code& err, size_t cb) {
+        [this, message](const boost::system::error_code& err, size_t cb) {
+        delete message;
         if (!err) {
           boost::asio::async_read(socket_, boost::asio::buffer(buffer_),
               boost::asio::transfer_exactly(544),
@@ -72,7 +78,7 @@ public:
     return 0;
   }
 
-  int lookup_object(std::string pool_name, std::string vol_name, std::string object_id, bool* result) {
+  int lookup_object(std::string pool_name, std::string vol_name, std::string object_id, Context* on_finish) {
     rbdsc_req_type_t *message = new rbdsc_req_type_t();
     message->type = RBDSC_READ;
     memcpy(message->pool_name, pool_name.c_str(), pool_name.size());
@@ -82,49 +88,48 @@ public:
     message->length = 0;
 
     boost::asio::async_write(socket_,  boost::asio::buffer((char*)message, message->size()),
-        [this, result](const boost::system::error_code& err, size_t cb) {
+        [this, on_finish, message](const boost::system::error_code& err, size_t cb) {
+        delete message;
         if (!err) {
-          get_result(result);
+          get_result(on_finish);
         } else {
           return -1;
         }
     });
-    std::unique_lock<std::mutex> lk(m);
-    //cv.wait(lk);
-    cv.wait_for(lk, std::chrono::milliseconds(100));
+
     return 0;
   }
 
-  void get_result(bool* result) {
+  void get_result(Context* on_finish) {
     boost::asio::async_read(socket_, boost::asio::buffer(buffer_),
         boost::asio::transfer_exactly(544),
-        [this, result](const boost::system::error_code& err, size_t cb) {
+        [this, on_finish](const boost::system::error_code& err, size_t cb) {
+        if (cb != 544) {
+	  assert(0);
+        }
         if (!err) {
 	    rbdsc_req_type_t *io_ctx = (rbdsc_req_type_t*)(buffer_);
             if (io_ctx->type == RBDSC_READ_REPLY) {
-	      *result = true;
+	      on_finish->complete(true);
+              return;
             } else {
-	      *result = false;
+	      on_finish->complete(false);
+              return;
             }
-            cv.notify_one();
-            m_client_process_msg(std::string(buffer_, cb));
         } else {
-            return -1;
+	    assert(0);
+            return on_finish->complete(false);
         }
     });
   }
 
-  void handle_connect(const boost::system::error_code& error) {
-    //TODO(): open librbd snap
-  }
-
-  void handle_write(const boost::system::error_code& error) {
-  }
 
 private:
   boost::asio::io_service& io_service_;
   boost::asio::io_service::work io_service_work_;
   stream_protocol::socket socket_;
+
+  std::shared_ptr<std::thread> io_thread;
   ClientProcessMsg m_client_process_msg;
   stream_protocol::endpoint ep_;
   char buffer_[1024];
diff --git a/src/tools/rbd_cache/CacheControllerSocketCommon.h b/src/tools/rbd_cache/CacheControllerSocketCommon.h
index a9d73a8..e026ec8 100644
--- a/src/tools/rbd_cache/CacheControllerSocketCommon.h
+++ b/src/tools/rbd_cache/CacheControllerSocketCommon.h
@@ -4,6 +4,7 @@
 #ifndef CACHE_CONTROLLER_SOCKET_COMMON_H
 #define CACHE_CONTROLLER_SOCKET_COMMON_H
 
+/*
 #define RBDSC_REGISTER         0X11
 #define RBDSC_READ             0X12
 #define RBDSC_LOOKUP           0X13
@@ -11,10 +12,21 @@
 #define RBDSC_READ_REPLY       0X15
 #define RBDSC_LOOKUP_REPLY     0X16
 #define RBDSC_READ_RADOS       0X17
+*/
 
 namespace rbd {
 namespace cache {
 
+static const int RBDSC_REGISTER        =  0X11;
+static const int RBDSC_READ            =  0X12;
+static const int RBDSC_LOOKUP          =  0X13;
+static const int RBDSC_REGISTER_REPLY  =  0X14;
+static const int RBDSC_READ_REPLY      =  0X15;
+static const int RBDSC_LOOKUP_REPLY    =  0X16;
+static const int RBDSC_READ_RADOS      =  0X17;
+
+
+
 typedef std::function<void(uint64_t, std::string)> ProcessMsg;
 typedef std::function<void(std::string)> ClientProcessMsg;
 typedef uint8_t rbdsc_req_type;
-- 
2.7.4