diff options
Diffstat (limited to 'kernel/net/ceph/messenger.c')
-rw-r--r-- | kernel/net/ceph/messenger.c | 291 |
1 files changed, 187 insertions, 104 deletions
diff --git a/kernel/net/ceph/messenger.c b/kernel/net/ceph/messenger.c index 967080a9f..63ae5dd24 100644 --- a/kernel/net/ceph/messenger.c +++ b/kernel/net/ceph/messenger.c @@ -6,6 +6,7 @@ #include <linux/inet.h> #include <linux/kthread.h> #include <linux/net.h> +#include <linux/nsproxy.h> #include <linux/slab.h> #include <linux/socket.h> #include <linux/string.h> @@ -162,6 +163,7 @@ static struct kmem_cache *ceph_msg_data_cache; static char tag_msg = CEPH_MSGR_TAG_MSG; static char tag_ack = CEPH_MSGR_TAG_ACK; static char tag_keepalive = CEPH_MSGR_TAG_KEEPALIVE; +static char tag_keepalive2 = CEPH_MSGR_TAG_KEEPALIVE2; #ifdef CONFIG_LOCKDEP static struct lock_class_key socket_class; @@ -175,7 +177,7 @@ static struct lock_class_key socket_class; static void queue_con(struct ceph_connection *con); static void cancel_con(struct ceph_connection *con); -static void con_work(struct work_struct *); +static void ceph_con_workfn(struct work_struct *); static void con_fault(struct ceph_connection *con); /* @@ -275,23 +277,22 @@ static void _ceph_msgr_exit(void) ceph_msgr_wq = NULL; } - ceph_msgr_slab_exit(); - BUG_ON(zero_page == NULL); - kunmap(zero_page); page_cache_release(zero_page); zero_page = NULL; + + ceph_msgr_slab_exit(); } int ceph_msgr_init(void) { + if (ceph_msgr_slab_init()) + return -ENOMEM; + BUG_ON(zero_page != NULL); zero_page = ZERO_PAGE(0); page_cache_get(zero_page); - if (ceph_msgr_slab_init()) - return -ENOMEM; - /* * The number of active work items is limited by the number of * connections, so leave @max_active at default. @@ -480,8 +481,8 @@ static int ceph_tcp_connect(struct ceph_connection *con) int ret; BUG_ON(con->sock); - ret = sock_create_kern(con->peer_addr.in_addr.ss_family, SOCK_STREAM, - IPPROTO_TCP, &sock); + ret = sock_create_kern(read_pnet(&con->msgr->net), paddr->ss_family, + SOCK_STREAM, IPPROTO_TCP, &sock); if (ret) return ret; sock->sk->sk_allocation = GFP_NOFS; @@ -508,7 +509,7 @@ static int ceph_tcp_connect(struct ceph_connection *con) return ret; } - if (con->msgr->tcp_nodelay) { + if (ceph_test_opt(from_msgr(con->msgr), TCP_NODELAY)) { int optval = 1; ret = kernel_setsockopt(sock, SOL_TCP, TCP_NODELAY, @@ -636,9 +637,6 @@ static int con_close_socket(struct ceph_connection *con) static void ceph_msg_remove(struct ceph_msg *msg) { list_del_init(&msg->list_head); - BUG_ON(msg->con == NULL); - msg->con->ops->put(msg->con); - msg->con = NULL; ceph_msg_put(msg); } @@ -661,20 +659,21 @@ static void reset_connection(struct ceph_connection *con) if (con->in_msg) { BUG_ON(con->in_msg->con != con); - con->in_msg->con = NULL; ceph_msg_put(con->in_msg); con->in_msg = NULL; - con->ops->put(con); } con->connect_seq = 0; con->out_seq = 0; if (con->out_msg) { + BUG_ON(con->out_msg->con != con); ceph_msg_put(con->out_msg); con->out_msg = NULL; } con->in_seq = 0; con->in_seq_acked = 0; + + con->out_skip = 0; } /* @@ -749,7 +748,7 @@ void ceph_con_init(struct ceph_connection *con, void *private, mutex_init(&con->mutex); INIT_LIST_HEAD(&con->out_queue); INIT_LIST_HEAD(&con->out_sent); - INIT_DELAYED_WORK(&con->work, con_work); + INIT_DELAYED_WORK(&con->work, ceph_con_workfn); con->state = CON_STATE_CLOSED; } @@ -774,6 +773,8 @@ static u32 get_global_seq(struct ceph_messenger *msgr, u32 gt) static void con_out_kvec_reset(struct ceph_connection *con) { + BUG_ON(con->out_skip); + con->out_kvec_left = 0; con->out_kvec_bytes = 0; con->out_kvec_cur = &con->out_kvec[0]; @@ -782,9 +783,9 @@ static void con_out_kvec_reset(struct ceph_connection *con) static void con_out_kvec_add(struct ceph_connection *con, size_t size, void *data) { - int index; + int index = con->out_kvec_left; - index = con->out_kvec_left; + BUG_ON(con->out_skip); BUG_ON(index >= ARRAY_SIZE(con->out_kvec)); con->out_kvec[index].iov_len = size; @@ -793,6 +794,27 @@ static void con_out_kvec_add(struct ceph_connection *con, con->out_kvec_bytes += size; } +/* + * Chop off a kvec from the end. Return residual number of bytes for + * that kvec, i.e. how many bytes would have been written if the kvec + * hadn't been nuked. + */ +static int con_out_kvec_skip(struct ceph_connection *con) +{ + int off = con->out_kvec_cur - con->out_kvec; + int skip = 0; + + if (con->out_kvec_bytes > 0) { + skip = con->out_kvec[off + con->out_kvec_left - 1].iov_len; + BUG_ON(con->out_kvec_bytes < skip); + BUG_ON(!con->out_kvec_left); + con->out_kvec_bytes -= skip; + con->out_kvec_left--; + } + + return skip; +} + #ifdef CONFIG_BLOCK /* @@ -1178,6 +1200,13 @@ static bool ceph_msg_data_advance(struct ceph_msg_data_cursor *cursor, return new_piece; } +static size_t sizeof_footer(struct ceph_connection *con) +{ + return (con->peer_features & CEPH_FEATURE_MSG_AUTH) ? + sizeof(struct ceph_msg_footer) : + sizeof(struct ceph_msg_footer_old); +} + static void prepare_message_data(struct ceph_msg *msg, u32 data_len) { BUG_ON(!msg); @@ -1200,11 +1229,10 @@ static void prepare_write_message_footer(struct ceph_connection *con) m->footer.flags |= CEPH_MSG_FOOTER_COMPLETE; dout("prepare_write_message_footer %p\n", con); - con->out_kvec_is_msg = true; con->out_kvec[v].iov_base = &m->footer; if (con->peer_features & CEPH_FEATURE_MSG_AUTH) { if (con->ops->sign_message) - con->ops->sign_message(con, m); + con->ops->sign_message(m); else m->footer.sig = 0; con->out_kvec[v].iov_len = sizeof(m->footer); @@ -1228,7 +1256,6 @@ static void prepare_write_message(struct ceph_connection *con) u32 crc; con_out_kvec_reset(con); - con->out_kvec_is_msg = true; con->out_msg_done = false; /* Sneak an ack in there first? If we can get it into the same @@ -1268,18 +1295,19 @@ static void prepare_write_message(struct ceph_connection *con) /* tag + hdr + front + middle */ con_out_kvec_add(con, sizeof (tag_msg), &tag_msg); - con_out_kvec_add(con, sizeof (m->hdr), &m->hdr); + con_out_kvec_add(con, sizeof(con->out_hdr), &con->out_hdr); con_out_kvec_add(con, m->front.iov_len, m->front.iov_base); if (m->middle) con_out_kvec_add(con, m->middle->vec.iov_len, m->middle->vec.iov_base); - /* fill in crc (except data pages), footer */ + /* fill in hdr crc and finalize hdr */ crc = crc32c(0, &m->hdr, offsetof(struct ceph_msg_header, crc)); con->out_msg->hdr.crc = cpu_to_le32(crc); - con->out_msg->footer.flags = 0; + memcpy(&con->out_hdr, &con->out_msg->hdr, sizeof(con->out_hdr)); + /* fill in front and middle crc, footer */ crc = crc32c(0, m->front.iov_base, m->front.iov_len); con->out_msg->footer.front_crc = cpu_to_le32(crc); if (m->middle) { @@ -1291,6 +1319,7 @@ static void prepare_write_message(struct ceph_connection *con) dout("%s front_crc %u middle_crc %u\n", __func__, le32_to_cpu(con->out_msg->footer.front_crc), le32_to_cpu(con->out_msg->footer.middle_crc)); + con->out_msg->footer.flags = 0; /* is there a data payload? */ con->out_msg->footer.data_crc = 0; @@ -1351,7 +1380,16 @@ static void prepare_write_keepalive(struct ceph_connection *con) { dout("prepare_write_keepalive %p\n", con); con_out_kvec_reset(con); - con_out_kvec_add(con, sizeof (tag_keepalive), &tag_keepalive); + if (con->peer_features & CEPH_FEATURE_MSGR_KEEPALIVE2) { + struct timespec now = CURRENT_TIME; + + con_out_kvec_add(con, sizeof(tag_keepalive2), &tag_keepalive2); + ceph_encode_timespec(&con->out_temp_keepalive2, &now); + con_out_kvec_add(con, sizeof(con->out_temp_keepalive2), + &con->out_temp_keepalive2); + } else { + con_out_kvec_add(con, sizeof(tag_keepalive), &tag_keepalive); + } con_flag_set(con, CON_FLAG_WRITE_PENDING); } @@ -1422,7 +1460,8 @@ static int prepare_write_connect(struct ceph_connection *con) dout("prepare_write_connect %p cseq=%d gseq=%d proto=%d\n", con, con->connect_seq, global_seq, proto); - con->out_connect.features = cpu_to_le64(con->msgr->supported_features); + con->out_connect.features = + cpu_to_le64(from_msgr(con->msgr)->supported_features); con->out_connect.host_type = cpu_to_le32(CEPH_ENTITY_TYPE_CLIENT); con->out_connect.connect_seq = cpu_to_le32(con->connect_seq); con->out_connect.global_seq = cpu_to_le32(global_seq); @@ -1485,7 +1524,6 @@ static int write_partial_kvec(struct ceph_connection *con) } } con->out_kvec_left = 0; - con->out_kvec_is_msg = false; ret = 1; out: dout("write_partial_kvec %p %d left in %d kvecs ret = %d\n", con, @@ -1517,7 +1555,7 @@ static int write_partial_message_data(struct ceph_connection *con) { struct ceph_msg *msg = con->out_msg; struct ceph_msg_data_cursor *cursor = &msg->cursor; - bool do_datacrc = !con->msgr->nocrc; + bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC); u32 crc; dout("%s %p msg %p\n", __func__, con, msg); @@ -1542,10 +1580,10 @@ static int write_partial_message_data(struct ceph_connection *con) bool need_crc; int ret; - page = ceph_msg_data_next(&msg->cursor, &page_offset, &length, - &last_piece); + page = ceph_msg_data_next(cursor, &page_offset, &length, + &last_piece); ret = ceph_tcp_sendpage(con->sock, page, page_offset, - length, last_piece); + length, !last_piece); if (ret <= 0) { if (do_datacrc) msg->footer.data_crc = cpu_to_le32(crc); @@ -1554,7 +1592,7 @@ static int write_partial_message_data(struct ceph_connection *con) } if (do_datacrc && cursor->need_crc) crc = ceph_crc32c_page(crc, page, page_offset, length); - need_crc = ceph_msg_data_advance(&msg->cursor, (size_t)ret); + need_crc = ceph_msg_data_advance(cursor, (size_t)ret); } dout("%s %p msg %p done\n", __func__, con, msg); @@ -1577,6 +1615,7 @@ static int write_partial_skip(struct ceph_connection *con) { int ret; + dout("%s %p %d left\n", __func__, con, con->out_skip); while (con->out_skip > 0) { size_t size = min(con->out_skip, (int) PAGE_CACHE_SIZE); @@ -1625,6 +1664,12 @@ static void prepare_read_tag(struct ceph_connection *con) con->in_tag = CEPH_MSGR_TAG_READY; } +static void prepare_read_keepalive_ack(struct ceph_connection *con) +{ + dout("prepare_read_keepalive_ack %p\n", con); + con->in_base_pos = 0; +} + /* * Prepare to read a message. */ @@ -1732,17 +1777,17 @@ static int verify_hello(struct ceph_connection *con) static bool addr_is_blank(struct sockaddr_storage *ss) { + struct in_addr *addr = &((struct sockaddr_in *)ss)->sin_addr; + struct in6_addr *addr6 = &((struct sockaddr_in6 *)ss)->sin6_addr; + switch (ss->ss_family) { case AF_INET: - return ((struct sockaddr_in *)ss)->sin_addr.s_addr == 0; + return addr->s_addr == htonl(INADDR_ANY); case AF_INET6: - return - ((struct sockaddr_in6 *)ss)->sin6_addr.s6_addr32[0] == 0 && - ((struct sockaddr_in6 *)ss)->sin6_addr.s6_addr32[1] == 0 && - ((struct sockaddr_in6 *)ss)->sin6_addr.s6_addr32[2] == 0 && - ((struct sockaddr_in6 *)ss)->sin6_addr.s6_addr32[3] == 0; + return ipv6_addr_any(addr6); + default: + return true; } - return false; } static int addr_port(struct sockaddr_storage *ss) @@ -1989,8 +2034,8 @@ static int process_banner(struct ceph_connection *con) static int process_connect(struct ceph_connection *con) { - u64 sup_feat = con->msgr->supported_features; - u64 req_feat = con->msgr->required_features; + u64 sup_feat = from_msgr(con->msgr)->supported_features; + u64 req_feat = from_msgr(con->msgr)->required_features; u64 server_feat = ceph_sanitize_features( le64_to_cpu(con->in_reply.features)); int ret; @@ -2216,7 +2261,7 @@ static int read_partial_msg_data(struct ceph_connection *con) { struct ceph_msg *msg = con->in_msg; struct ceph_msg_data_cursor *cursor = &msg->cursor; - const bool do_datacrc = !con->msgr->nocrc; + bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC); struct page *page; size_t page_offset; size_t length; @@ -2230,8 +2275,7 @@ static int read_partial_msg_data(struct ceph_connection *con) if (do_datacrc) crc = con->in_data_crc; while (cursor->resid) { - page = ceph_msg_data_next(&msg->cursor, &page_offset, &length, - NULL); + page = ceph_msg_data_next(cursor, &page_offset, &length, NULL); ret = ceph_tcp_recvpage(con->sock, page, page_offset, length); if (ret <= 0) { if (do_datacrc) @@ -2242,7 +2286,7 @@ static int read_partial_msg_data(struct ceph_connection *con) if (do_datacrc) crc = ceph_crc32c_page(crc, page, page_offset, ret); - (void) ceph_msg_data_advance(&msg->cursor, (size_t)ret); + (void) ceph_msg_data_advance(cursor, (size_t)ret); } if (do_datacrc) con->in_data_crc = crc; @@ -2262,7 +2306,7 @@ static int read_partial_message(struct ceph_connection *con) int end; int ret; unsigned int front_len, middle_len, data_len; - bool do_datacrc = !con->msgr->nocrc; + bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC); bool need_sign = (con->peer_features & CEPH_FEATURE_MSG_AUTH); u64 seq; u32 crc; @@ -2301,9 +2345,9 @@ static int read_partial_message(struct ceph_connection *con) ceph_pr_addr(&con->peer_addr.in_addr), seq, con->in_seq + 1); con->in_base_pos = -front_len - middle_len - data_len - - sizeof(m->footer); + sizeof_footer(con); con->in_tag = CEPH_MSGR_TAG_READY; - return 0; + return 1; } else if ((s64)seq - (s64)con->in_seq > 1) { pr_err("read_partial_message bad seq %lld expected %lld\n", seq, con->in_seq + 1); @@ -2322,21 +2366,14 @@ static int read_partial_message(struct ceph_connection *con) return ret; BUG_ON(!con->in_msg ^ skip); - if (con->in_msg && data_len > con->in_msg->data_length) { - pr_warn("%s skipping long message (%u > %zd)\n", - __func__, data_len, con->in_msg->data_length); - ceph_msg_put(con->in_msg); - con->in_msg = NULL; - skip = 1; - } if (skip) { /* skip this message */ dout("alloc_msg said skip message\n"); con->in_base_pos = -front_len - middle_len - data_len - - sizeof(m->footer); + sizeof_footer(con); con->in_tag = CEPH_MSGR_TAG_READY; con->in_seq++; - return 0; + return 1; } BUG_ON(!con->in_msg); @@ -2414,7 +2451,7 @@ static int read_partial_message(struct ceph_connection *con) } if (need_sign && con->ops->check_message_signature && - con->ops->check_message_signature(con, m)) { + con->ops->check_message_signature(m)) { pr_err("read_partial_message %p signature check failed\n", m); return -EBADMSG; } @@ -2429,13 +2466,10 @@ static int read_partial_message(struct ceph_connection *con) */ static void process_message(struct ceph_connection *con) { - struct ceph_msg *msg; + struct ceph_msg *msg = con->in_msg; BUG_ON(con->in_msg->con != con); - con->in_msg->con = NULL; - msg = con->in_msg; con->in_msg = NULL; - con->ops->put(con); /* if first message, set peer_name */ if (con->peer_name.type == 0) @@ -2457,6 +2491,17 @@ static void process_message(struct ceph_connection *con) mutex_lock(&con->mutex); } +static int read_keepalive_ack(struct ceph_connection *con) +{ + struct ceph_timespec ceph_ts; + size_t size = sizeof(ceph_ts); + int ret = read_partial(con, size, size, &ceph_ts); + if (ret <= 0) + return ret; + ceph_decode_timespec(&con->last_keepalive_ack, &ceph_ts); + prepare_read_tag(con); + return 1; +} /* * Write something to the socket. Called in a worker thread when the @@ -2493,13 +2538,13 @@ more: more_kvec: /* kvec data queued? */ - if (con->out_skip) { - ret = write_partial_skip(con); + if (con->out_kvec_left) { + ret = write_partial_kvec(con); if (ret <= 0) goto out; } - if (con->out_kvec_left) { - ret = write_partial_kvec(con); + if (con->out_skip) { + ret = write_partial_skip(con); if (ret <= 0) goto out; } @@ -2526,6 +2571,10 @@ more_kvec: do_next: if (con->state == CON_STATE_OPEN) { + if (con_flag_test_and_clear(con, CON_FLAG_KEEPALIVE_PENDING)) { + prepare_write_keepalive(con); + goto more; + } /* is anything else pending? */ if (!list_empty(&con->out_queue)) { prepare_write_message(con); @@ -2535,10 +2584,6 @@ do_next: prepare_write_ack(con); goto more; } - if (con_flag_test_and_clear(con, CON_FLAG_KEEPALIVE_PENDING)) { - prepare_write_keepalive(con); - goto more; - } } /* Nothing to do! */ @@ -2641,6 +2686,9 @@ more: case CEPH_MSGR_TAG_ACK: prepare_read_ack(con); break; + case CEPH_MSGR_TAG_KEEPALIVE2_ACK: + prepare_read_keepalive_ack(con); + break; case CEPH_MSGR_TAG_CLOSE: con_close_socket(con); con->state = CON_STATE_CLOSED; @@ -2654,7 +2702,7 @@ more: if (ret <= 0) { switch (ret) { case -EBADMSG: - con->error_msg = "bad crc"; + con->error_msg = "bad crc/signature"; /* fall through */ case -EBADE: ret = -EIO; @@ -2684,6 +2732,12 @@ more: process_ack(con); goto more; } + if (con->in_tag == CEPH_MSGR_TAG_KEEPALIVE2_ACK) { + ret = read_keepalive_ack(con); + if (ret <= 0) + goto out; + goto more; + } out: dout("try_read done on %p ret %d\n", con, ret); @@ -2799,7 +2853,7 @@ static void con_fault_finish(struct ceph_connection *con) /* * Do some work on a connection. Drop a connection ref when we're done. */ -static void con_work(struct work_struct *work) +static void ceph_con_workfn(struct work_struct *work) { struct ceph_connection *con = container_of(work, struct ceph_connection, work.work); @@ -2889,10 +2943,8 @@ static void con_fault(struct ceph_connection *con) if (con->in_msg) { BUG_ON(con->in_msg->con != con); - con->in_msg->con = NULL; ceph_msg_put(con->in_msg); con->in_msg = NULL; - con->ops->put(con); } /* Requeue anything that hasn't been acked */ @@ -2923,15 +2975,8 @@ static void con_fault(struct ceph_connection *con) * initialize a new messenger instance */ void ceph_messenger_init(struct ceph_messenger *msgr, - struct ceph_entity_addr *myaddr, - u64 supported_features, - u64 required_features, - bool nocrc, - bool tcp_nodelay) + struct ceph_entity_addr *myaddr) { - msgr->supported_features = supported_features; - msgr->required_features = required_features; - spin_lock_init(&msgr->global_seq_lock); if (myaddr) @@ -2941,15 +2986,29 @@ void ceph_messenger_init(struct ceph_messenger *msgr, msgr->inst.addr.type = 0; get_random_bytes(&msgr->inst.addr.nonce, sizeof(msgr->inst.addr.nonce)); encode_my_addr(msgr); - msgr->nocrc = nocrc; - msgr->tcp_nodelay = tcp_nodelay; atomic_set(&msgr->stopping, 0); + write_pnet(&msgr->net, get_net(current->nsproxy->net_ns)); dout("%s %p\n", __func__, msgr); } EXPORT_SYMBOL(ceph_messenger_init); +void ceph_messenger_fini(struct ceph_messenger *msgr) +{ + put_net(read_pnet(&msgr->net)); +} +EXPORT_SYMBOL(ceph_messenger_fini); + +static void msg_con_set(struct ceph_msg *msg, struct ceph_connection *con) +{ + if (msg->con) + msg->con->ops->put(msg->con); + + msg->con = con ? con->ops->get(con) : NULL; + BUG_ON(msg->con != con); +} + static void clear_standby(struct ceph_connection *con) { /* come back from STANDBY? */ @@ -2981,9 +3040,7 @@ void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg) return; } - BUG_ON(msg->con != NULL); - msg->con = con->ops->get(con); - BUG_ON(msg->con == NULL); + msg_con_set(msg, con); BUG_ON(!list_empty(&msg->list_head)); list_add_tail(&msg->list_head, &con->out_queue); @@ -3011,31 +3068,45 @@ void ceph_msg_revoke(struct ceph_msg *msg) { struct ceph_connection *con = msg->con; - if (!con) + if (!con) { + dout("%s msg %p null con\n", __func__, msg); return; /* Message not in our possession */ + } mutex_lock(&con->mutex); if (!list_empty(&msg->list_head)) { dout("%s %p msg %p - was on queue\n", __func__, con, msg); list_del_init(&msg->list_head); - BUG_ON(msg->con == NULL); - msg->con->ops->put(msg->con); - msg->con = NULL; msg->hdr.seq = 0; ceph_msg_put(msg); } if (con->out_msg == msg) { - dout("%s %p msg %p - was sending\n", __func__, con, msg); - con->out_msg = NULL; - if (con->out_kvec_is_msg) { - con->out_skip = con->out_kvec_bytes; - con->out_kvec_is_msg = false; + BUG_ON(con->out_skip); + /* footer */ + if (con->out_msg_done) { + con->out_skip += con_out_kvec_skip(con); + } else { + BUG_ON(!msg->data_length); + if (con->peer_features & CEPH_FEATURE_MSG_AUTH) + con->out_skip += sizeof(msg->footer); + else + con->out_skip += sizeof(msg->old_footer); } + /* data, middle, front */ + if (msg->data_length) + con->out_skip += msg->cursor.total_resid; + if (msg->middle) + con->out_skip += con_out_kvec_skip(con); + con->out_skip += con_out_kvec_skip(con); + + dout("%s %p msg %p - was sending, will write %d skip %d\n", + __func__, con, msg, con->out_kvec_bytes, con->out_skip); msg->hdr.seq = 0; - + con->out_msg = NULL; ceph_msg_put(msg); } + mutex_unlock(&con->mutex); } @@ -3044,16 +3115,13 @@ void ceph_msg_revoke(struct ceph_msg *msg) */ void ceph_msg_revoke_incoming(struct ceph_msg *msg) { - struct ceph_connection *con; + struct ceph_connection *con = msg->con; - BUG_ON(msg == NULL); - if (!msg->con) { + if (!con) { dout("%s msg %p null con\n", __func__, msg); - return; /* Message not in our possession */ } - con = msg->con; mutex_lock(&con->mutex); if (con->in_msg == msg) { unsigned int front_len = le32_to_cpu(con->in_hdr.front_len); @@ -3094,6 +3162,20 @@ void ceph_con_keepalive(struct ceph_connection *con) } EXPORT_SYMBOL(ceph_con_keepalive); +bool ceph_con_keepalive_expired(struct ceph_connection *con, + unsigned long interval) +{ + if (interval > 0 && + (con->peer_features & CEPH_FEATURE_MSGR_KEEPALIVE2)) { + struct timespec now = CURRENT_TIME; + struct timespec ts; + jiffies_to_timespec(interval, &ts); + ts = timespec_add(con->last_keepalive_ack, ts); + return timespec_compare(&now, &ts) >= 0; + } + return false; +} + static struct ceph_msg_data *ceph_msg_data_create(enum ceph_msg_data_type type) { struct ceph_msg_data *data; @@ -3285,9 +3367,8 @@ static int ceph_con_in_msg_alloc(struct ceph_connection *con, int *skip) } if (msg) { BUG_ON(*skip); + msg_con_set(msg, con); con->in_msg = msg; - con->in_msg->con = con->ops->get(con); - BUG_ON(con->in_msg->con == NULL); } else { /* * Null message pointer means either we should skip @@ -3334,6 +3415,8 @@ static void ceph_msg_release(struct kref *kref) dout("%s %p\n", __func__, m); WARN_ON(!list_empty(&m->list_head)); + msg_con_set(m, NULL); + /* drop middle, data, if any */ if (m->middle) { ceph_buffer_put(m->middle); |