diff options
Diffstat (limited to 'kernel/net/ceph/osd_client.c')
-rw-r--r-- | kernel/net/ceph/osd_client.c | 140 |
1 files changed, 69 insertions, 71 deletions
diff --git a/kernel/net/ceph/osd_client.c b/kernel/net/ceph/osd_client.c index c4ec92392..a28e47ff1 100644 --- a/kernel/net/ceph/osd_client.c +++ b/kernel/net/ceph/osd_client.c @@ -120,11 +120,13 @@ static void ceph_osd_data_bio_init(struct ceph_osd_data *osd_data, } #endif /* CONFIG_BLOCK */ -#define osd_req_op_data(oreq, whch, typ, fld) \ - ({ \ - BUG_ON(whch >= (oreq)->r_num_ops); \ - &(oreq)->r_ops[whch].typ.fld; \ - }) +#define osd_req_op_data(oreq, whch, typ, fld) \ +({ \ + struct ceph_osd_request *__oreq = (oreq); \ + unsigned int __whch = (whch); \ + BUG_ON(__whch >= __oreq->r_num_ops); \ + &__oreq->r_ops[__whch].typ.fld; \ +}) static struct ceph_osd_data * osd_req_op_raw_data_in(struct ceph_osd_request *osd_req, unsigned int which) @@ -285,6 +287,7 @@ static void osd_req_op_data_release(struct ceph_osd_request *osd_req, switch (op->op) { case CEPH_OSD_OP_READ: case CEPH_OSD_OP_WRITE: + case CEPH_OSD_OP_WRITEFULL: ceph_osd_data_release(&op->extent.osd_data); break; case CEPH_OSD_OP_CALL: @@ -296,6 +299,9 @@ static void osd_req_op_data_release(struct ceph_osd_request *osd_req, case CEPH_OSD_OP_CMPXATTR: ceph_osd_data_release(&op->xattr.osd_data); break; + case CEPH_OSD_OP_STAT: + ceph_osd_data_release(&op->raw_data_in); + break; default: break; } @@ -450,7 +456,7 @@ __CEPH_FORALL_OSD_OPS(GENERATE_CASE) */ static struct ceph_osd_req_op * _osd_req_op_init(struct ceph_osd_request *osd_req, unsigned int which, - u16 opcode) + u16 opcode, u32 flags) { struct ceph_osd_req_op *op; @@ -460,14 +466,15 @@ _osd_req_op_init(struct ceph_osd_request *osd_req, unsigned int which, op = &osd_req->r_ops[which]; memset(op, 0, sizeof (*op)); op->op = opcode; + op->flags = flags; return op; } void osd_req_op_init(struct ceph_osd_request *osd_req, - unsigned int which, u16 opcode) + unsigned int which, u16 opcode, u32 flags) { - (void)_osd_req_op_init(osd_req, which, opcode); + (void)_osd_req_op_init(osd_req, which, opcode, flags); } EXPORT_SYMBOL(osd_req_op_init); @@ -476,17 +483,19 @@ void osd_req_op_extent_init(struct ceph_osd_request *osd_req, u64 offset, u64 length, u64 truncate_size, u32 truncate_seq) { - struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, opcode); + struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, + opcode, 0); size_t payload_len = 0; BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE && - opcode != CEPH_OSD_OP_ZERO && opcode != CEPH_OSD_OP_TRUNCATE); + opcode != CEPH_OSD_OP_WRITEFULL && opcode != CEPH_OSD_OP_ZERO && + opcode != CEPH_OSD_OP_TRUNCATE); op->extent.offset = offset; op->extent.length = length; op->extent.truncate_size = truncate_size; op->extent.truncate_seq = truncate_seq; - if (opcode == CEPH_OSD_OP_WRITE) + if (opcode == CEPH_OSD_OP_WRITE || opcode == CEPH_OSD_OP_WRITEFULL) payload_len += length; op->payload_len = payload_len; @@ -515,7 +524,8 @@ EXPORT_SYMBOL(osd_req_op_extent_update); void osd_req_op_cls_init(struct ceph_osd_request *osd_req, unsigned int which, u16 opcode, const char *class, const char *method) { - struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, opcode); + struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, + opcode, 0); struct ceph_pagelist *pagelist; size_t payload_len = 0; size_t size; @@ -552,7 +562,8 @@ int osd_req_op_xattr_init(struct ceph_osd_request *osd_req, unsigned int which, u16 opcode, const char *name, const void *value, size_t size, u8 cmp_op, u8 cmp_mode) { - struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, opcode); + struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, + opcode, 0); struct ceph_pagelist *pagelist; size_t payload_len; @@ -585,7 +596,8 @@ void osd_req_op_watch_init(struct ceph_osd_request *osd_req, unsigned int which, u16 opcode, u64 cookie, u64 version, int flag) { - struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, opcode); + struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, + opcode, 0); BUG_ON(opcode != CEPH_OSD_OP_NOTIFY_ACK && opcode != CEPH_OSD_OP_WATCH); @@ -602,7 +614,8 @@ void osd_req_op_alloc_hint_init(struct ceph_osd_request *osd_req, u64 expected_write_size) { struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, - CEPH_OSD_OP_SETALLOCHINT); + CEPH_OSD_OP_SETALLOCHINT, + 0); op->alloc_hint.expected_object_size = expected_object_size; op->alloc_hint.expected_write_size = expected_write_size; @@ -661,9 +674,11 @@ static u64 osd_req_encode_op(struct ceph_osd_request *req, break; case CEPH_OSD_OP_READ: case CEPH_OSD_OP_WRITE: + case CEPH_OSD_OP_WRITEFULL: case CEPH_OSD_OP_ZERO: case CEPH_OSD_OP_TRUNCATE: - if (src->op == CEPH_OSD_OP_WRITE) + if (src->op == CEPH_OSD_OP_WRITE || + src->op == CEPH_OSD_OP_WRITEFULL) request_data_len = src->extent.length; dst->extent.offset = cpu_to_le64(src->extent.offset); dst->extent.length = cpu_to_le64(src->extent.length); @@ -672,7 +687,8 @@ static u64 osd_req_encode_op(struct ceph_osd_request *req, dst->extent.truncate_seq = cpu_to_le32(src->extent.truncate_seq); osd_data = &src->extent.osd_data; - if (src->op == CEPH_OSD_OP_WRITE) + if (src->op == CEPH_OSD_OP_WRITE || + src->op == CEPH_OSD_OP_WRITEFULL) ceph_osdc_msg_data_add(req->r_request, osd_data); else ceph_osdc_msg_data_add(req->r_reply, osd_data); @@ -786,7 +802,7 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, } if (opcode == CEPH_OSD_OP_CREATE || opcode == CEPH_OSD_OP_DELETE) { - osd_req_op_init(req, which, opcode); + osd_req_op_init(req, which, opcode, 0); } else { u32 object_size = le32_to_cpu(layout->fl_object_size); u32 object_base = off - objoff; @@ -1088,7 +1104,7 @@ static void __move_osd_to_lru(struct ceph_osd_client *osdc, BUG_ON(!list_empty(&osd->o_osd_lru)); list_add_tail(&osd->o_osd_lru, &osdc->osd_lru); - osd->lru_ttl = jiffies + osdc->client->options->osd_idle_ttl * HZ; + osd->lru_ttl = jiffies + osdc->client->options->osd_idle_ttl; } static void maybe_move_osd_to_lru(struct ceph_osd_client *osdc, @@ -1199,7 +1215,7 @@ static struct ceph_osd *__lookup_osd(struct ceph_osd_client *osdc, int o) static void __schedule_osd_timeout(struct ceph_osd_client *osdc) { schedule_delayed_work(&osdc->timeout_work, - osdc->client->options->osd_keepalive_timeout * HZ); + osdc->client->options->osd_keepalive_timeout); } static void __cancel_osd_timeout(struct ceph_osd_client *osdc) @@ -1567,10 +1583,9 @@ static void handle_timeout(struct work_struct *work) { struct ceph_osd_client *osdc = container_of(work, struct ceph_osd_client, timeout_work.work); + struct ceph_options *opts = osdc->client->options; struct ceph_osd_request *req; struct ceph_osd *osd; - unsigned long keepalive = - osdc->client->options->osd_keepalive_timeout * HZ; struct list_head slow_osds; dout("timeout\n"); down_read(&osdc->map_sem); @@ -1586,7 +1601,8 @@ static void handle_timeout(struct work_struct *work) */ INIT_LIST_HEAD(&slow_osds); list_for_each_entry(req, &osdc->req_lru, r_req_lru_item) { - if (time_before(jiffies, req->r_stamp + keepalive)) + if (time_before(jiffies, + req->r_stamp + opts->osd_keepalive_timeout)) break; osd = req->r_osd; @@ -1613,8 +1629,7 @@ static void handle_osds_timeout(struct work_struct *work) struct ceph_osd_client *osdc = container_of(work, struct ceph_osd_client, osds_timeout_work.work); - unsigned long delay = - osdc->client->options->osd_idle_ttl * HZ >> 2; + unsigned long delay = osdc->client->options->osd_idle_ttl / 4; dout("osds timeout\n"); down_read(&osdc->map_sem); @@ -1737,8 +1752,7 @@ static void complete_request(struct ceph_osd_request *req) * handle osd op reply. either call the callback if it is specified, * or do the completion to wake up the waiting thread. */ -static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg, - struct ceph_connection *con) +static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg) { void *p, *end; struct ceph_osd_request *req; @@ -2619,7 +2633,7 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client) osdc->event_count = 0; schedule_delayed_work(&osdc->osds_timeout_work, - round_jiffies_relative(osdc->client->options->osd_idle_ttl * HZ)); + round_jiffies_relative(osdc->client->options->osd_idle_ttl)); err = -ENOMEM; osdc->req_mempool = mempool_create_kmalloc_pool(10, @@ -2794,7 +2808,7 @@ static void dispatch(struct ceph_connection *con, struct ceph_msg *msg) ceph_osdc_handle_map(osdc, msg); break; case CEPH_MSG_OSD_OPREPLY: - handle_reply(osdc, msg, con); + handle_reply(osdc, msg); break; case CEPH_MSG_WATCH_NOTIFY: handle_watch_notify(osdc, msg); @@ -2809,8 +2823,9 @@ out: } /* - * lookup and return message for incoming reply. set up reply message - * pages. + * Lookup and return message for incoming reply. Don't try to do + * anything about a larger than preallocated data portion of the + * message at the moment - for now, just skip the message. */ static struct ceph_msg *get_reply(struct ceph_connection *con, struct ceph_msg_header *hdr, @@ -2828,23 +2843,19 @@ static struct ceph_msg *get_reply(struct ceph_connection *con, mutex_lock(&osdc->request_mutex); req = __lookup_request(osdc, tid); if (!req) { - *skip = 1; + dout("%s osd%d tid %llu unknown, skipping\n", __func__, + osd->o_osd, tid); m = NULL; - dout("get_reply unknown tid %llu from osd%d\n", tid, - osd->o_osd); + *skip = 1; goto out; } - if (req->r_reply->con) - dout("%s revoking msg %p from old con %p\n", __func__, - req->r_reply, req->r_reply->con); ceph_msg_revoke_incoming(req->r_reply); if (front_len > req->r_reply->front_alloc_len) { - pr_warn("get_reply front %d > preallocated %d (%u#%llu)\n", - front_len, req->r_reply->front_alloc_len, - (unsigned int)con->peer_name.type, - le64_to_cpu(con->peer_name.num)); + pr_warn("%s osd%d tid %llu front %d > preallocated %d\n", + __func__, osd->o_osd, req->r_tid, front_len, + req->r_reply->front_alloc_len); m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front_len, GFP_NOFS, false); if (!m) @@ -2852,37 +2863,22 @@ static struct ceph_msg *get_reply(struct ceph_connection *con, ceph_msg_put(req->r_reply); req->r_reply = m; } - m = ceph_msg_get(req->r_reply); - - if (data_len > 0) { - struct ceph_osd_data *osd_data; - /* - * XXX This is assuming there is only one op containing - * XXX page data. Probably OK for reads, but this - * XXX ought to be done more generally. - */ - osd_data = osd_req_op_extent_osd_data(req, 0); - if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES) { - if (osd_data->pages && - unlikely(osd_data->length < data_len)) { - - pr_warn("tid %lld reply has %d bytes we had only %llu bytes ready\n", - tid, data_len, osd_data->length); - *skip = 1; - ceph_msg_put(m); - m = NULL; - goto out; - } - } + if (data_len > req->r_reply->data_length) { + pr_warn("%s osd%d tid %llu data %d > preallocated %zu, skipping\n", + __func__, osd->o_osd, req->r_tid, data_len, + req->r_reply->data_length); + m = NULL; + *skip = 1; + goto out; } - *skip = 0; + + m = ceph_msg_get(req->r_reply); dout("get_reply tid %lld %p\n", tid, m); out: mutex_unlock(&osdc->request_mutex); return m; - } static struct ceph_msg *alloc_msg(struct ceph_connection *con, @@ -2980,17 +2976,19 @@ static int invalidate_authorizer(struct ceph_connection *con) return ceph_monc_validate_auth(&osdc->client->monc); } -static int sign_message(struct ceph_connection *con, struct ceph_msg *msg) +static int osd_sign_message(struct ceph_msg *msg) { - struct ceph_osd *o = con->private; + struct ceph_osd *o = msg->con->private; struct ceph_auth_handshake *auth = &o->o_auth; + return ceph_auth_sign_message(auth, msg); } -static int check_message_signature(struct ceph_connection *con, struct ceph_msg *msg) +static int osd_check_message_signature(struct ceph_msg *msg) { - struct ceph_osd *o = con->private; + struct ceph_osd *o = msg->con->private; struct ceph_auth_handshake *auth = &o->o_auth; + return ceph_auth_check_message_signature(auth, msg); } @@ -3002,7 +3000,7 @@ static const struct ceph_connection_operations osd_con_ops = { .verify_authorizer_reply = verify_authorizer_reply, .invalidate_authorizer = invalidate_authorizer, .alloc_msg = alloc_msg, - .sign_message = sign_message, - .check_message_signature = check_message_signature, + .sign_message = osd_sign_message, + .check_message_signature = osd_check_message_signature, .fault = osd_reset, }; |