From e09b41010ba33a20a87472ee821fa407a5b8da36 Mon Sep 17 00:00:00 2001 From: José Pekkarinen Date: Mon, 11 Apr 2016 10:41:07 +0300 Subject: These changes are the raw update to linux-4.4.6-rt14. Kernel sources are taken from kernel.org, and rt patch from the rt wiki download page. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit During the rebasing, the following patch collided: Force tick interrupt and get rid of softirq magic(I70131fb85). Collisions have been removed because its logic was found on the source already. Change-Id: I7f57a4081d9deaa0d9ccfc41a6c8daccdee3b769 Signed-off-by: José Pekkarinen --- kernel/net/ceph/auth_x.c | 36 ++-- kernel/net/ceph/ceph_common.c | 87 +++++++--- kernel/net/ceph/crush/crush.c | 13 +- kernel/net/ceph/crush/crush_ln_table.h | 32 ++-- kernel/net/ceph/crush/hash.c | 8 +- kernel/net/ceph/crush/mapper.c | 148 +++++++++++------ kernel/net/ceph/crypto.c | 10 +- kernel/net/ceph/crypto.h | 4 +- kernel/net/ceph/messenger.c | 291 +++++++++++++++++++++------------ kernel/net/ceph/mon_client.c | 50 ++++-- kernel/net/ceph/osd_client.c | 140 ++++++++-------- kernel/net/ceph/osdmap.c | 2 +- kernel/net/ceph/pagevec.c | 5 +- 13 files changed, 510 insertions(+), 316 deletions(-) (limited to 'kernel/net/ceph') diff --git a/kernel/net/ceph/auth_x.c b/kernel/net/ceph/auth_x.c index ba6eb1722..10d87753e 100644 --- a/kernel/net/ceph/auth_x.c +++ b/kernel/net/ceph/auth_x.c @@ -8,6 +8,7 @@ #include #include +#include #include #include "crypto.h" @@ -279,6 +280,15 @@ bad: return -EINVAL; } +static void ceph_x_authorizer_cleanup(struct ceph_x_authorizer *au) +{ + ceph_crypto_key_destroy(&au->session_key); + if (au->buf) { + ceph_buffer_put(au->buf); + au->buf = NULL; + } +} + static int ceph_x_build_authorizer(struct ceph_auth_client *ac, struct ceph_x_ticket_handler *th, struct ceph_x_authorizer *au) @@ -297,7 +307,7 @@ static int ceph_x_build_authorizer(struct ceph_auth_client *ac, ceph_crypto_key_destroy(&au->session_key); ret = ceph_crypto_key_clone(&au->session_key, &th->session_key); if (ret) - return ret; + goto out_au; maxlen = sizeof(*msg_a) + sizeof(msg_b) + ceph_x_encrypt_buflen(ticket_blob_len); @@ -309,8 +319,8 @@ static int ceph_x_build_authorizer(struct ceph_auth_client *ac, if (!au->buf) { au->buf = ceph_buffer_new(maxlen, GFP_NOFS); if (!au->buf) { - ceph_crypto_key_destroy(&au->session_key); - return -ENOMEM; + ret = -ENOMEM; + goto out_au; } } au->service = th->service; @@ -340,7 +350,7 @@ static int ceph_x_build_authorizer(struct ceph_auth_client *ac, ret = ceph_x_encrypt(&au->session_key, &msg_b, sizeof(msg_b), p, end - p); if (ret < 0) - goto out_buf; + goto out_au; p += ret; au->buf->vec.iov_len = p - au->buf->vec.iov_base; dout(" built authorizer nonce %llx len %d\n", au->nonce, @@ -348,9 +358,8 @@ static int ceph_x_build_authorizer(struct ceph_auth_client *ac, BUG_ON(au->buf->vec.iov_len > maxlen); return 0; -out_buf: - ceph_buffer_put(au->buf); - au->buf = NULL; +out_au: + ceph_x_authorizer_cleanup(au); return ret; } @@ -624,8 +633,7 @@ static void ceph_x_destroy_authorizer(struct ceph_auth_client *ac, { struct ceph_x_authorizer *au = (void *)a; - ceph_crypto_key_destroy(&au->session_key); - ceph_buffer_put(au->buf); + ceph_x_authorizer_cleanup(au); kfree(au); } @@ -653,8 +661,7 @@ static void ceph_x_destroy(struct ceph_auth_client *ac) remove_ticket_handler(ac, th); } - if (xi->auth_authorizer.buf) - ceph_buffer_put(xi->auth_authorizer.buf); + ceph_x_authorizer_cleanup(&xi->auth_authorizer); kfree(ac->private); ac->private = NULL; @@ -691,8 +698,10 @@ static int ceph_x_sign_message(struct ceph_auth_handshake *auth, struct ceph_msg *msg) { int ret; - if (!auth->authorizer) + + if (ceph_test_opt(from_msgr(msg->con->msgr), NOMSGSIGN)) return 0; + ret = calcu_signature((struct ceph_x_authorizer *)auth->authorizer, msg, &msg->footer.sig); if (ret < 0) @@ -707,8 +716,9 @@ static int ceph_x_check_message_signature(struct ceph_auth_handshake *auth, __le64 sig_check; int ret; - if (!auth->authorizer) + if (ceph_test_opt(from_msgr(msg->con->msgr), NOMSGSIGN)) return 0; + ret = calcu_signature((struct ceph_x_authorizer *)auth->authorizer, msg, &sig_check); if (ret < 0) diff --git a/kernel/net/ceph/ceph_common.c b/kernel/net/ceph/ceph_common.c index 3f76eb84b..bcbec33c6 100644 --- a/kernel/net/ceph/ceph_common.c +++ b/kernel/net/ceph/ceph_common.c @@ -9,6 +9,7 @@ #include #include #include +#include #include #include #include @@ -16,8 +17,6 @@ #include #include #include -#include -#include #include @@ -131,6 +130,13 @@ int ceph_compare_options(struct ceph_options *new_opt, int i; int ret; + /* + * Don't bother comparing options if network namespaces don't + * match. + */ + if (!net_eq(current->nsproxy->net_ns, read_pnet(&client->msgr.net))) + return -1; + ret = memcmp(opt1, opt2, ofs); if (ret) return ret; @@ -239,6 +245,8 @@ enum { Opt_nocrc, Opt_cephx_require_signatures, Opt_nocephx_require_signatures, + Opt_cephx_sign_messages, + Opt_nocephx_sign_messages, Opt_tcp_nodelay, Opt_notcp_nodelay, }; @@ -261,6 +269,8 @@ static match_table_t opt_tokens = { {Opt_nocrc, "nocrc"}, {Opt_cephx_require_signatures, "cephx_require_signatures"}, {Opt_nocephx_require_signatures, "nocephx_require_signatures"}, + {Opt_cephx_sign_messages, "cephx_sign_messages"}, + {Opt_nocephx_sign_messages, "nocephx_sign_messages"}, {Opt_tcp_nodelay, "tcp_nodelay"}, {Opt_notcp_nodelay, "notcp_nodelay"}, {-1, NULL} @@ -312,7 +322,7 @@ static int get_secret(struct ceph_crypto_key *dst, const char *name) { goto out; } - ckey = ukey->payload.data; + ckey = ukey->payload.data[0]; err = ceph_crypto_key_clone(dst, ckey); if (err) goto out_key; @@ -335,9 +345,6 @@ ceph_parse_options(char *options, const char *dev_name, int err = -ENOMEM; substring_t argstr[MAX_OPT_ARGS]; - if (current->nsproxy->net_ns != &init_net) - return ERR_PTR(-EINVAL); - opt = kzalloc(sizeof(*opt), GFP_KERNEL); if (!opt) return ERR_PTR(-ENOMEM); @@ -352,8 +359,9 @@ ceph_parse_options(char *options, const char *dev_name, /* start with defaults */ opt->flags = CEPH_OPT_DEFAULT; opt->osd_keepalive_timeout = CEPH_OSD_KEEPALIVE_DEFAULT; - opt->mount_timeout = CEPH_MOUNT_TIMEOUT_DEFAULT; /* seconds */ - opt->osd_idle_ttl = CEPH_OSD_IDLE_TTL_DEFAULT; /* seconds */ + opt->mount_timeout = CEPH_MOUNT_TIMEOUT_DEFAULT; + opt->osd_idle_ttl = CEPH_OSD_IDLE_TTL_DEFAULT; + opt->monc_ping_timeout = CEPH_MONC_PING_TIMEOUT_DEFAULT; /* get mon ip(s) */ /* ip1[:port1][,ip2[:port2]...] */ @@ -439,13 +447,32 @@ ceph_parse_options(char *options, const char *dev_name, pr_warn("ignoring deprecated osdtimeout option\n"); break; case Opt_osdkeepalivetimeout: - opt->osd_keepalive_timeout = intval; + /* 0 isn't well defined right now, reject it */ + if (intval < 1 || intval > INT_MAX / 1000) { + pr_err("osdkeepalive out of range\n"); + err = -EINVAL; + goto out; + } + opt->osd_keepalive_timeout = + msecs_to_jiffies(intval * 1000); break; case Opt_osd_idle_ttl: - opt->osd_idle_ttl = intval; + /* 0 isn't well defined right now, reject it */ + if (intval < 1 || intval > INT_MAX / 1000) { + pr_err("osd_idle_ttl out of range\n"); + err = -EINVAL; + goto out; + } + opt->osd_idle_ttl = msecs_to_jiffies(intval * 1000); break; case Opt_mount_timeout: - opt->mount_timeout = intval; + /* 0 is "wait forever" (i.e. infinite timeout) */ + if (intval < 0 || intval > INT_MAX / 1000) { + pr_err("mount_timeout out of range\n"); + err = -EINVAL; + goto out; + } + opt->mount_timeout = msecs_to_jiffies(intval * 1000); break; case Opt_share: @@ -468,6 +495,12 @@ ceph_parse_options(char *options, const char *dev_name, case Opt_nocephx_require_signatures: opt->flags |= CEPH_OPT_NOMSGAUTH; break; + case Opt_cephx_sign_messages: + opt->flags &= ~CEPH_OPT_NOMSGSIGN; + break; + case Opt_nocephx_sign_messages: + opt->flags |= CEPH_OPT_NOMSGSIGN; + break; case Opt_tcp_nodelay: opt->flags |= CEPH_OPT_TCP_NODELAY; @@ -511,16 +544,20 @@ int ceph_print_client_options(struct seq_file *m, struct ceph_client *client) seq_puts(m, "nocrc,"); if (opt->flags & CEPH_OPT_NOMSGAUTH) seq_puts(m, "nocephx_require_signatures,"); + if (opt->flags & CEPH_OPT_NOMSGSIGN) + seq_puts(m, "nocephx_sign_messages,"); if ((opt->flags & CEPH_OPT_TCP_NODELAY) == 0) seq_puts(m, "notcp_nodelay,"); if (opt->mount_timeout != CEPH_MOUNT_TIMEOUT_DEFAULT) - seq_printf(m, "mount_timeout=%d,", opt->mount_timeout); + seq_printf(m, "mount_timeout=%d,", + jiffies_to_msecs(opt->mount_timeout) / 1000); if (opt->osd_idle_ttl != CEPH_OSD_IDLE_TTL_DEFAULT) - seq_printf(m, "osd_idle_ttl=%d,", opt->osd_idle_ttl); + seq_printf(m, "osd_idle_ttl=%d,", + jiffies_to_msecs(opt->osd_idle_ttl) / 1000); if (opt->osd_keepalive_timeout != CEPH_OSD_KEEPALIVE_DEFAULT) seq_printf(m, "osdkeepalivetimeout=%d,", - opt->osd_keepalive_timeout); + jiffies_to_msecs(opt->osd_keepalive_timeout) / 1000); /* drop redundant comma */ if (m->count != pos) @@ -571,11 +608,7 @@ struct ceph_client *ceph_create_client(struct ceph_options *opt, void *private, if (ceph_test_opt(client, MYIP)) myaddr = &client->options->my_addr; - ceph_messenger_init(&client->msgr, myaddr, - client->supported_features, - client->required_features, - ceph_test_opt(client, NOCRC), - ceph_test_opt(client, TCP_NODELAY)); + ceph_messenger_init(&client->msgr, myaddr); /* subsystems */ err = ceph_monc_init(&client->monc, client); @@ -590,6 +623,7 @@ struct ceph_client *ceph_create_client(struct ceph_options *opt, void *private, fail_monc: ceph_monc_stop(&client->monc); fail: + ceph_messenger_fini(&client->msgr); kfree(client); return ERR_PTR(err); } @@ -603,8 +637,8 @@ void ceph_destroy_client(struct ceph_client *client) /* unmount */ ceph_osdc_stop(&client->osdc); - ceph_monc_stop(&client->monc); + ceph_messenger_fini(&client->msgr); ceph_debugfs_client_cleanup(client); @@ -629,8 +663,8 @@ static int have_mon_and_osd_map(struct ceph_client *client) */ int __ceph_open_session(struct ceph_client *client, unsigned long started) { - int err; - unsigned long timeout = client->options->mount_timeout * HZ; + unsigned long timeout = client->options->mount_timeout; + long err; /* open session, and wait for mon and osd maps */ err = ceph_monc_open_session(&client->monc); @@ -638,16 +672,15 @@ int __ceph_open_session(struct ceph_client *client, unsigned long started) return err; while (!have_mon_and_osd_map(client)) { - err = -EIO; if (timeout && time_after_eq(jiffies, started + timeout)) - return err; + return -ETIMEDOUT; /* wait */ dout("mount waiting for mon_map\n"); err = wait_event_interruptible_timeout(client->auth_wq, have_mon_and_osd_map(client) || (client->auth_err < 0), - timeout); - if (err == -EINTR || err == -ERESTARTSYS) + ceph_timeout_jiffies(timeout)); + if (err < 0) return err; if (client->auth_err < 0) return client->auth_err; @@ -724,5 +757,5 @@ module_exit(exit_ceph_lib); MODULE_AUTHOR("Sage Weil "); MODULE_AUTHOR("Yehuda Sadeh "); MODULE_AUTHOR("Patience Warnick "); -MODULE_DESCRIPTION("Ceph filesystem for Linux"); +MODULE_DESCRIPTION("Ceph core library"); MODULE_LICENSE("GPL"); diff --git a/kernel/net/ceph/crush/crush.c b/kernel/net/ceph/crush/crush.c index 9d84ce4ea..80d7c3a97 100644 --- a/kernel/net/ceph/crush/crush.c +++ b/kernel/net/ceph/crush/crush.c @@ -1,15 +1,11 @@ - #ifdef __KERNEL__ # include +# include #else -# include -# include -# define kfree(x) do { if (x) free(x); } while (0) -# define BUG_ON(x) assert(!(x)) +# include "crush_compat.h" +# include "crush.h" #endif -#include - const char *crush_bucket_alg_name(int alg) { switch (alg) { @@ -134,6 +130,9 @@ void crush_destroy(struct crush_map *map) kfree(map->rules); } +#ifndef __KERNEL__ + kfree(map->choose_tries); +#endif kfree(map); } diff --git a/kernel/net/ceph/crush/crush_ln_table.h b/kernel/net/ceph/crush/crush_ln_table.h index 6192c7fc9..aae534c90 100644 --- a/kernel/net/ceph/crush/crush_ln_table.h +++ b/kernel/net/ceph/crush/crush_ln_table.h @@ -10,20 +10,20 @@ * */ -#if defined(__linux__) -#include -#elif defined(__FreeBSD__) -#include -#endif - #ifndef CEPH_CRUSH_LN_H #define CEPH_CRUSH_LN_H +#ifdef __KERNEL__ +# include +#else +# include "crush_compat.h" +#endif -// RH_LH_tbl[2*k] = 2^48/(1.0+k/128.0) -// RH_LH_tbl[2*k+1] = 2^48*log2(1.0+k/128.0) - -static int64_t __RH_LH_tbl[128*2+2] = { +/* + * RH_LH_tbl[2*k] = 2^48/(1.0+k/128.0) + * RH_LH_tbl[2*k+1] = 2^48*log2(1.0+k/128.0) + */ +static __s64 __RH_LH_tbl[128*2+2] = { 0x0001000000000000ll, 0x0000000000000000ll, 0x0000fe03f80fe040ll, 0x000002dfca16dde1ll, 0x0000fc0fc0fc0fc1ll, 0x000005b9e5a170b4ll, 0x0000fa232cf25214ll, 0x0000088e68ea899all, 0x0000f83e0f83e0f9ll, 0x00000b5d69bac77ell, 0x0000f6603d980f67ll, 0x00000e26fd5c8555ll, @@ -89,11 +89,12 @@ static int64_t __RH_LH_tbl[128*2+2] = { 0x0000820820820821ll, 0x0000fa2f045e7832ll, 0x000081848da8faf1ll, 0x0000fba577877d7dll, 0x0000810204081021ll, 0x0000fd1a708bbe11ll, 0x0000808080808081ll, 0x0000fe8df263f957ll, 0x0000800000000000ll, 0x0000ffff00000000ll, - }; - +}; - // LL_tbl[k] = 2^48*log2(1.0+k/2^15); -static int64_t __LL_tbl[256] = { +/* + * LL_tbl[k] = 2^48*log2(1.0+k/2^15) + */ +static __s64 __LL_tbl[256] = { 0x0000000000000000ull, 0x00000002e2a60a00ull, 0x000000070cb64ec5ull, 0x00000009ef50ce67ull, 0x0000000cd1e588fdull, 0x0000000fb4747e9cull, 0x0000001296fdaf5eull, 0x0000001579811b58ull, 0x000000185bfec2a1ull, 0x0000001b3e76a552ull, 0x0000001e20e8c380ull, 0x0000002103551d43ull, @@ -160,7 +161,4 @@ static int64_t __LL_tbl[256] = { 0x000002d4562d2ec6ull, 0x000002d73330209dull, 0x000002da102d63b0ull, 0x000002dced24f814ull, }; - - - #endif diff --git a/kernel/net/ceph/crush/hash.c b/kernel/net/ceph/crush/hash.c index 5bb63e37a..ed123af49 100644 --- a/kernel/net/ceph/crush/hash.c +++ b/kernel/net/ceph/crush/hash.c @@ -1,6 +1,8 @@ - -#include -#include +#ifdef __KERNEL__ +# include +#else +# include "hash.h" +#endif /* * Robert Jenkins' function for mixing 32-bit values diff --git a/kernel/net/ceph/crush/mapper.c b/kernel/net/ceph/crush/mapper.c index 5b47736d2..393bfb22d 100644 --- a/kernel/net/ceph/crush/mapper.c +++ b/kernel/net/ceph/crush/mapper.c @@ -1,27 +1,31 @@ +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2015 Intel Corporation All Rights Reserved + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ #ifdef __KERNEL__ # include # include # include # include -# ifndef dprintk -# define dprintk(args...) -# endif +# include +# include #else -# include -# include -# include -# include -# define BUG_ON(x) assert(!(x)) -# define dprintk(args...) /* printf(args) */ -# define kmalloc(x, f) malloc(x) -# define kfree(x) free(x) +# include "crush_compat.h" +# include "crush.h" +# include "hash.h" #endif - -#include -#include #include "crush_ln_table.h" +#define dprintk(args...) /* printf(args) */ + /* * Implement the core CRUSH mapping algorithm. */ @@ -139,7 +143,7 @@ static int bucket_list_choose(struct crush_bucket_list *bucket, int i; for (i = bucket->h.size-1; i >= 0; i--) { - __u64 w = crush_hash32_4(bucket->h.hash,x, bucket->h.items[i], + __u64 w = crush_hash32_4(bucket->h.hash, x, bucket->h.items[i], r, bucket->h.id); w &= 0xffff; dprintk("list_choose i=%d x=%d r=%d item %d weight %x " @@ -238,43 +242,46 @@ static int bucket_straw_choose(struct crush_bucket_straw *bucket, return bucket->h.items[high]; } -// compute 2^44*log2(input+1) -uint64_t crush_ln(unsigned xin) +/* compute 2^44*log2(input+1) */ +static __u64 crush_ln(unsigned int xin) { - unsigned x=xin, x1; - int iexpon, index1, index2; - uint64_t RH, LH, LL, xl64, result; + unsigned int x = xin, x1; + int iexpon, index1, index2; + __u64 RH, LH, LL, xl64, result; - x++; + x++; - // normalize input - iexpon = 15; - while(!(x&0x18000)) { x<<=1; iexpon--; } + /* normalize input */ + iexpon = 15; + while (!(x & 0x18000)) { + x <<= 1; + iexpon--; + } - index1 = (x>>8)<<1; - // RH ~ 2^56/index1 - RH = __RH_LH_tbl[index1 - 256]; - // LH ~ 2^48 * log2(index1/256) - LH = __RH_LH_tbl[index1 + 1 - 256]; + index1 = (x >> 8) << 1; + /* RH ~ 2^56/index1 */ + RH = __RH_LH_tbl[index1 - 256]; + /* LH ~ 2^48 * log2(index1/256) */ + LH = __RH_LH_tbl[index1 + 1 - 256]; - // RH*x ~ 2^48 * (2^15 + xf), xf<2^8 - xl64 = (int64_t)x * RH; - xl64 >>= 48; - x1 = xl64; + /* RH*x ~ 2^48 * (2^15 + xf), xf<2^8 */ + xl64 = (__s64)x * RH; + xl64 >>= 48; + x1 = xl64; - result = iexpon; - result <<= (12 + 32); + result = iexpon; + result <<= (12 + 32); - index2 = x1 & 0xff; - // LL ~ 2^48*log2(1.0+index2/2^15) - LL = __LL_tbl[index2]; + index2 = x1 & 0xff; + /* LL ~ 2^48*log2(1.0+index2/2^15) */ + LL = __LL_tbl[index2]; - LH = LH + LL; + LH = LH + LL; - LH >>= (48-12 - 32); - result += LH; + LH >>= (48 - 12 - 32); + result += LH; - return result; + return result; } @@ -290,9 +297,9 @@ uint64_t crush_ln(unsigned xin) static int bucket_straw2_choose(struct crush_bucket_straw2 *bucket, int x, int r) { - unsigned i, high = 0; - unsigned u; - unsigned w; + unsigned int i, high = 0; + unsigned int u; + unsigned int w; __s64 ln, draw, high_draw = 0; for (i = 0; i < bucket->h.size; i++) { @@ -567,6 +574,10 @@ reject: out[outpos] = item; outpos++; count--; +#ifndef __KERNEL__ + if (map->choose_tries && ftotal <= map->choose_total_tries) + map->choose_tries[ftotal]++; +#endif } dprintk("CHOOSE returns %d\n", outpos); @@ -610,6 +621,20 @@ static void crush_choose_indep(const struct crush_map *map, } for (ftotal = 0; left > 0 && ftotal < tries; ftotal++) { +#ifdef DEBUG_INDEP + if (out2 && ftotal) { + dprintk("%u %d a: ", ftotal, left); + for (rep = outpos; rep < endpos; rep++) { + dprintk(" %d", out[rep]); + } + dprintk("\n"); + dprintk("%u %d b: ", ftotal, left); + for (rep = outpos; rep < endpos; rep++) { + dprintk(" %d", out2[rep]); + } + dprintk("\n"); + } +#endif for (rep = outpos; rep < endpos; rep++) { if (out[rep] != CRUSH_ITEM_UNDEF) continue; @@ -726,6 +751,24 @@ static void crush_choose_indep(const struct crush_map *map, out2[rep] = CRUSH_ITEM_NONE; } } +#ifndef __KERNEL__ + if (map->choose_tries && ftotal <= map->choose_total_tries) + map->choose_tries[ftotal]++; +#endif +#ifdef DEBUG_INDEP + if (out2) { + dprintk("%u %d a: ", ftotal, left); + for (rep = outpos; rep < endpos; rep++) { + dprintk(" %d", out[rep]); + } + dprintk("\n"); + dprintk("%u %d b: ", ftotal, left); + for (rep = outpos; rep < endpos; rep++) { + dprintk(" %d", out2[rep]); + } + dprintk("\n"); + } +#endif } /** @@ -790,8 +833,15 @@ int crush_do_rule(const struct crush_map *map, switch (curstep->op) { case CRUSH_RULE_TAKE: - w[0] = curstep->arg1; - wsize = 1; + if ((curstep->arg1 >= 0 && + curstep->arg1 < map->max_devices) || + (-1-curstep->arg1 < map->max_buckets && + map->buckets[-1-curstep->arg1])) { + w[0] = curstep->arg1; + wsize = 1; + } else { + dprintk(" bad take value %d\n", curstep->arg1); + } break; case CRUSH_RULE_SET_CHOOSE_TRIES: @@ -877,7 +927,7 @@ int crush_do_rule(const struct crush_map *map, 0); } else { out_size = ((numrep < (result_max-osize)) ? - numrep : (result_max-osize)); + numrep : (result_max-osize)); crush_choose_indep( map, map->buckets[-1-w[i]], @@ -923,5 +973,3 @@ int crush_do_rule(const struct crush_map *map, } return result_len; } - - diff --git a/kernel/net/ceph/crypto.c b/kernel/net/ceph/crypto.c index 790fe89d9..42e8649c6 100644 --- a/kernel/net/ceph/crypto.c +++ b/kernel/net/ceph/crypto.c @@ -79,10 +79,6 @@ int ceph_crypto_key_unarmor(struct ceph_crypto_key *key, const char *inkey) return 0; } - - -#define AES_KEY_SIZE 16 - static struct crypto_blkcipher *ceph_crypto_alloc_cipher(void) { return crypto_alloc_blkcipher("cbc(aes)", 0, CRYPTO_ALG_ASYNC); @@ -541,7 +537,7 @@ static int ceph_key_preparse(struct key_preparsed_payload *prep) if (ret < 0) goto err_ckey; - prep->payload[0] = ckey; + prep->payload.data[0] = ckey; prep->quotalen = datalen; return 0; @@ -553,14 +549,14 @@ err: static void ceph_key_free_preparse(struct key_preparsed_payload *prep) { - struct ceph_crypto_key *ckey = prep->payload[0]; + struct ceph_crypto_key *ckey = prep->payload.data[0]; ceph_crypto_key_destroy(ckey); kfree(ckey); } static void ceph_key_destroy(struct key *key) { - struct ceph_crypto_key *ckey = key->payload.data; + struct ceph_crypto_key *ckey = key->payload.data[0]; ceph_crypto_key_destroy(ckey); kfree(ckey); diff --git a/kernel/net/ceph/crypto.h b/kernel/net/ceph/crypto.h index d1498224c..2e9cab09f 100644 --- a/kernel/net/ceph/crypto.h +++ b/kernel/net/ceph/crypto.h @@ -16,8 +16,10 @@ struct ceph_crypto_key { static inline void ceph_crypto_key_destroy(struct ceph_crypto_key *key) { - if (key) + if (key) { kfree(key->key); + key->key = NULL; + } } int ceph_crypto_key_clone(struct ceph_crypto_key *dst, diff --git a/kernel/net/ceph/messenger.c b/kernel/net/ceph/messenger.c index 967080a9f..63ae5dd24 100644 --- a/kernel/net/ceph/messenger.c +++ b/kernel/net/ceph/messenger.c @@ -6,6 +6,7 @@ #include #include #include +#include #include #include #include @@ -162,6 +163,7 @@ static struct kmem_cache *ceph_msg_data_cache; static char tag_msg = CEPH_MSGR_TAG_MSG; static char tag_ack = CEPH_MSGR_TAG_ACK; static char tag_keepalive = CEPH_MSGR_TAG_KEEPALIVE; +static char tag_keepalive2 = CEPH_MSGR_TAG_KEEPALIVE2; #ifdef CONFIG_LOCKDEP static struct lock_class_key socket_class; @@ -175,7 +177,7 @@ static struct lock_class_key socket_class; static void queue_con(struct ceph_connection *con); static void cancel_con(struct ceph_connection *con); -static void con_work(struct work_struct *); +static void ceph_con_workfn(struct work_struct *); static void con_fault(struct ceph_connection *con); /* @@ -275,23 +277,22 @@ static void _ceph_msgr_exit(void) ceph_msgr_wq = NULL; } - ceph_msgr_slab_exit(); - BUG_ON(zero_page == NULL); - kunmap(zero_page); page_cache_release(zero_page); zero_page = NULL; + + ceph_msgr_slab_exit(); } int ceph_msgr_init(void) { + if (ceph_msgr_slab_init()) + return -ENOMEM; + BUG_ON(zero_page != NULL); zero_page = ZERO_PAGE(0); page_cache_get(zero_page); - if (ceph_msgr_slab_init()) - return -ENOMEM; - /* * The number of active work items is limited by the number of * connections, so leave @max_active at default. @@ -480,8 +481,8 @@ static int ceph_tcp_connect(struct ceph_connection *con) int ret; BUG_ON(con->sock); - ret = sock_create_kern(con->peer_addr.in_addr.ss_family, SOCK_STREAM, - IPPROTO_TCP, &sock); + ret = sock_create_kern(read_pnet(&con->msgr->net), paddr->ss_family, + SOCK_STREAM, IPPROTO_TCP, &sock); if (ret) return ret; sock->sk->sk_allocation = GFP_NOFS; @@ -508,7 +509,7 @@ static int ceph_tcp_connect(struct ceph_connection *con) return ret; } - if (con->msgr->tcp_nodelay) { + if (ceph_test_opt(from_msgr(con->msgr), TCP_NODELAY)) { int optval = 1; ret = kernel_setsockopt(sock, SOL_TCP, TCP_NODELAY, @@ -636,9 +637,6 @@ static int con_close_socket(struct ceph_connection *con) static void ceph_msg_remove(struct ceph_msg *msg) { list_del_init(&msg->list_head); - BUG_ON(msg->con == NULL); - msg->con->ops->put(msg->con); - msg->con = NULL; ceph_msg_put(msg); } @@ -661,20 +659,21 @@ static void reset_connection(struct ceph_connection *con) if (con->in_msg) { BUG_ON(con->in_msg->con != con); - con->in_msg->con = NULL; ceph_msg_put(con->in_msg); con->in_msg = NULL; - con->ops->put(con); } con->connect_seq = 0; con->out_seq = 0; if (con->out_msg) { + BUG_ON(con->out_msg->con != con); ceph_msg_put(con->out_msg); con->out_msg = NULL; } con->in_seq = 0; con->in_seq_acked = 0; + + con->out_skip = 0; } /* @@ -749,7 +748,7 @@ void ceph_con_init(struct ceph_connection *con, void *private, mutex_init(&con->mutex); INIT_LIST_HEAD(&con->out_queue); INIT_LIST_HEAD(&con->out_sent); - INIT_DELAYED_WORK(&con->work, con_work); + INIT_DELAYED_WORK(&con->work, ceph_con_workfn); con->state = CON_STATE_CLOSED; } @@ -774,6 +773,8 @@ static u32 get_global_seq(struct ceph_messenger *msgr, u32 gt) static void con_out_kvec_reset(struct ceph_connection *con) { + BUG_ON(con->out_skip); + con->out_kvec_left = 0; con->out_kvec_bytes = 0; con->out_kvec_cur = &con->out_kvec[0]; @@ -782,9 +783,9 @@ static void con_out_kvec_reset(struct ceph_connection *con) static void con_out_kvec_add(struct ceph_connection *con, size_t size, void *data) { - int index; + int index = con->out_kvec_left; - index = con->out_kvec_left; + BUG_ON(con->out_skip); BUG_ON(index >= ARRAY_SIZE(con->out_kvec)); con->out_kvec[index].iov_len = size; @@ -793,6 +794,27 @@ static void con_out_kvec_add(struct ceph_connection *con, con->out_kvec_bytes += size; } +/* + * Chop off a kvec from the end. Return residual number of bytes for + * that kvec, i.e. how many bytes would have been written if the kvec + * hadn't been nuked. + */ +static int con_out_kvec_skip(struct ceph_connection *con) +{ + int off = con->out_kvec_cur - con->out_kvec; + int skip = 0; + + if (con->out_kvec_bytes > 0) { + skip = con->out_kvec[off + con->out_kvec_left - 1].iov_len; + BUG_ON(con->out_kvec_bytes < skip); + BUG_ON(!con->out_kvec_left); + con->out_kvec_bytes -= skip; + con->out_kvec_left--; + } + + return skip; +} + #ifdef CONFIG_BLOCK /* @@ -1178,6 +1200,13 @@ static bool ceph_msg_data_advance(struct ceph_msg_data_cursor *cursor, return new_piece; } +static size_t sizeof_footer(struct ceph_connection *con) +{ + return (con->peer_features & CEPH_FEATURE_MSG_AUTH) ? + sizeof(struct ceph_msg_footer) : + sizeof(struct ceph_msg_footer_old); +} + static void prepare_message_data(struct ceph_msg *msg, u32 data_len) { BUG_ON(!msg); @@ -1200,11 +1229,10 @@ static void prepare_write_message_footer(struct ceph_connection *con) m->footer.flags |= CEPH_MSG_FOOTER_COMPLETE; dout("prepare_write_message_footer %p\n", con); - con->out_kvec_is_msg = true; con->out_kvec[v].iov_base = &m->footer; if (con->peer_features & CEPH_FEATURE_MSG_AUTH) { if (con->ops->sign_message) - con->ops->sign_message(con, m); + con->ops->sign_message(m); else m->footer.sig = 0; con->out_kvec[v].iov_len = sizeof(m->footer); @@ -1228,7 +1256,6 @@ static void prepare_write_message(struct ceph_connection *con) u32 crc; con_out_kvec_reset(con); - con->out_kvec_is_msg = true; con->out_msg_done = false; /* Sneak an ack in there first? If we can get it into the same @@ -1268,18 +1295,19 @@ static void prepare_write_message(struct ceph_connection *con) /* tag + hdr + front + middle */ con_out_kvec_add(con, sizeof (tag_msg), &tag_msg); - con_out_kvec_add(con, sizeof (m->hdr), &m->hdr); + con_out_kvec_add(con, sizeof(con->out_hdr), &con->out_hdr); con_out_kvec_add(con, m->front.iov_len, m->front.iov_base); if (m->middle) con_out_kvec_add(con, m->middle->vec.iov_len, m->middle->vec.iov_base); - /* fill in crc (except data pages), footer */ + /* fill in hdr crc and finalize hdr */ crc = crc32c(0, &m->hdr, offsetof(struct ceph_msg_header, crc)); con->out_msg->hdr.crc = cpu_to_le32(crc); - con->out_msg->footer.flags = 0; + memcpy(&con->out_hdr, &con->out_msg->hdr, sizeof(con->out_hdr)); + /* fill in front and middle crc, footer */ crc = crc32c(0, m->front.iov_base, m->front.iov_len); con->out_msg->footer.front_crc = cpu_to_le32(crc); if (m->middle) { @@ -1291,6 +1319,7 @@ static void prepare_write_message(struct ceph_connection *con) dout("%s front_crc %u middle_crc %u\n", __func__, le32_to_cpu(con->out_msg->footer.front_crc), le32_to_cpu(con->out_msg->footer.middle_crc)); + con->out_msg->footer.flags = 0; /* is there a data payload? */ con->out_msg->footer.data_crc = 0; @@ -1351,7 +1380,16 @@ static void prepare_write_keepalive(struct ceph_connection *con) { dout("prepare_write_keepalive %p\n", con); con_out_kvec_reset(con); - con_out_kvec_add(con, sizeof (tag_keepalive), &tag_keepalive); + if (con->peer_features & CEPH_FEATURE_MSGR_KEEPALIVE2) { + struct timespec now = CURRENT_TIME; + + con_out_kvec_add(con, sizeof(tag_keepalive2), &tag_keepalive2); + ceph_encode_timespec(&con->out_temp_keepalive2, &now); + con_out_kvec_add(con, sizeof(con->out_temp_keepalive2), + &con->out_temp_keepalive2); + } else { + con_out_kvec_add(con, sizeof(tag_keepalive), &tag_keepalive); + } con_flag_set(con, CON_FLAG_WRITE_PENDING); } @@ -1422,7 +1460,8 @@ static int prepare_write_connect(struct ceph_connection *con) dout("prepare_write_connect %p cseq=%d gseq=%d proto=%d\n", con, con->connect_seq, global_seq, proto); - con->out_connect.features = cpu_to_le64(con->msgr->supported_features); + con->out_connect.features = + cpu_to_le64(from_msgr(con->msgr)->supported_features); con->out_connect.host_type = cpu_to_le32(CEPH_ENTITY_TYPE_CLIENT); con->out_connect.connect_seq = cpu_to_le32(con->connect_seq); con->out_connect.global_seq = cpu_to_le32(global_seq); @@ -1485,7 +1524,6 @@ static int write_partial_kvec(struct ceph_connection *con) } } con->out_kvec_left = 0; - con->out_kvec_is_msg = false; ret = 1; out: dout("write_partial_kvec %p %d left in %d kvecs ret = %d\n", con, @@ -1517,7 +1555,7 @@ static int write_partial_message_data(struct ceph_connection *con) { struct ceph_msg *msg = con->out_msg; struct ceph_msg_data_cursor *cursor = &msg->cursor; - bool do_datacrc = !con->msgr->nocrc; + bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC); u32 crc; dout("%s %p msg %p\n", __func__, con, msg); @@ -1542,10 +1580,10 @@ static int write_partial_message_data(struct ceph_connection *con) bool need_crc; int ret; - page = ceph_msg_data_next(&msg->cursor, &page_offset, &length, - &last_piece); + page = ceph_msg_data_next(cursor, &page_offset, &length, + &last_piece); ret = ceph_tcp_sendpage(con->sock, page, page_offset, - length, last_piece); + length, !last_piece); if (ret <= 0) { if (do_datacrc) msg->footer.data_crc = cpu_to_le32(crc); @@ -1554,7 +1592,7 @@ static int write_partial_message_data(struct ceph_connection *con) } if (do_datacrc && cursor->need_crc) crc = ceph_crc32c_page(crc, page, page_offset, length); - need_crc = ceph_msg_data_advance(&msg->cursor, (size_t)ret); + need_crc = ceph_msg_data_advance(cursor, (size_t)ret); } dout("%s %p msg %p done\n", __func__, con, msg); @@ -1577,6 +1615,7 @@ static int write_partial_skip(struct ceph_connection *con) { int ret; + dout("%s %p %d left\n", __func__, con, con->out_skip); while (con->out_skip > 0) { size_t size = min(con->out_skip, (int) PAGE_CACHE_SIZE); @@ -1625,6 +1664,12 @@ static void prepare_read_tag(struct ceph_connection *con) con->in_tag = CEPH_MSGR_TAG_READY; } +static void prepare_read_keepalive_ack(struct ceph_connection *con) +{ + dout("prepare_read_keepalive_ack %p\n", con); + con->in_base_pos = 0; +} + /* * Prepare to read a message. */ @@ -1732,17 +1777,17 @@ static int verify_hello(struct ceph_connection *con) static bool addr_is_blank(struct sockaddr_storage *ss) { + struct in_addr *addr = &((struct sockaddr_in *)ss)->sin_addr; + struct in6_addr *addr6 = &((struct sockaddr_in6 *)ss)->sin6_addr; + switch (ss->ss_family) { case AF_INET: - return ((struct sockaddr_in *)ss)->sin_addr.s_addr == 0; + return addr->s_addr == htonl(INADDR_ANY); case AF_INET6: - return - ((struct sockaddr_in6 *)ss)->sin6_addr.s6_addr32[0] == 0 && - ((struct sockaddr_in6 *)ss)->sin6_addr.s6_addr32[1] == 0 && - ((struct sockaddr_in6 *)ss)->sin6_addr.s6_addr32[2] == 0 && - ((struct sockaddr_in6 *)ss)->sin6_addr.s6_addr32[3] == 0; + return ipv6_addr_any(addr6); + default: + return true; } - return false; } static int addr_port(struct sockaddr_storage *ss) @@ -1989,8 +2034,8 @@ static int process_banner(struct ceph_connection *con) static int process_connect(struct ceph_connection *con) { - u64 sup_feat = con->msgr->supported_features; - u64 req_feat = con->msgr->required_features; + u64 sup_feat = from_msgr(con->msgr)->supported_features; + u64 req_feat = from_msgr(con->msgr)->required_features; u64 server_feat = ceph_sanitize_features( le64_to_cpu(con->in_reply.features)); int ret; @@ -2216,7 +2261,7 @@ static int read_partial_msg_data(struct ceph_connection *con) { struct ceph_msg *msg = con->in_msg; struct ceph_msg_data_cursor *cursor = &msg->cursor; - const bool do_datacrc = !con->msgr->nocrc; + bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC); struct page *page; size_t page_offset; size_t length; @@ -2230,8 +2275,7 @@ static int read_partial_msg_data(struct ceph_connection *con) if (do_datacrc) crc = con->in_data_crc; while (cursor->resid) { - page = ceph_msg_data_next(&msg->cursor, &page_offset, &length, - NULL); + page = ceph_msg_data_next(cursor, &page_offset, &length, NULL); ret = ceph_tcp_recvpage(con->sock, page, page_offset, length); if (ret <= 0) { if (do_datacrc) @@ -2242,7 +2286,7 @@ static int read_partial_msg_data(struct ceph_connection *con) if (do_datacrc) crc = ceph_crc32c_page(crc, page, page_offset, ret); - (void) ceph_msg_data_advance(&msg->cursor, (size_t)ret); + (void) ceph_msg_data_advance(cursor, (size_t)ret); } if (do_datacrc) con->in_data_crc = crc; @@ -2262,7 +2306,7 @@ static int read_partial_message(struct ceph_connection *con) int end; int ret; unsigned int front_len, middle_len, data_len; - bool do_datacrc = !con->msgr->nocrc; + bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC); bool need_sign = (con->peer_features & CEPH_FEATURE_MSG_AUTH); u64 seq; u32 crc; @@ -2301,9 +2345,9 @@ static int read_partial_message(struct ceph_connection *con) ceph_pr_addr(&con->peer_addr.in_addr), seq, con->in_seq + 1); con->in_base_pos = -front_len - middle_len - data_len - - sizeof(m->footer); + sizeof_footer(con); con->in_tag = CEPH_MSGR_TAG_READY; - return 0; + return 1; } else if ((s64)seq - (s64)con->in_seq > 1) { pr_err("read_partial_message bad seq %lld expected %lld\n", seq, con->in_seq + 1); @@ -2322,21 +2366,14 @@ static int read_partial_message(struct ceph_connection *con) return ret; BUG_ON(!con->in_msg ^ skip); - if (con->in_msg && data_len > con->in_msg->data_length) { - pr_warn("%s skipping long message (%u > %zd)\n", - __func__, data_len, con->in_msg->data_length); - ceph_msg_put(con->in_msg); - con->in_msg = NULL; - skip = 1; - } if (skip) { /* skip this message */ dout("alloc_msg said skip message\n"); con->in_base_pos = -front_len - middle_len - data_len - - sizeof(m->footer); + sizeof_footer(con); con->in_tag = CEPH_MSGR_TAG_READY; con->in_seq++; - return 0; + return 1; } BUG_ON(!con->in_msg); @@ -2414,7 +2451,7 @@ static int read_partial_message(struct ceph_connection *con) } if (need_sign && con->ops->check_message_signature && - con->ops->check_message_signature(con, m)) { + con->ops->check_message_signature(m)) { pr_err("read_partial_message %p signature check failed\n", m); return -EBADMSG; } @@ -2429,13 +2466,10 @@ static int read_partial_message(struct ceph_connection *con) */ static void process_message(struct ceph_connection *con) { - struct ceph_msg *msg; + struct ceph_msg *msg = con->in_msg; BUG_ON(con->in_msg->con != con); - con->in_msg->con = NULL; - msg = con->in_msg; con->in_msg = NULL; - con->ops->put(con); /* if first message, set peer_name */ if (con->peer_name.type == 0) @@ -2457,6 +2491,17 @@ static void process_message(struct ceph_connection *con) mutex_lock(&con->mutex); } +static int read_keepalive_ack(struct ceph_connection *con) +{ + struct ceph_timespec ceph_ts; + size_t size = sizeof(ceph_ts); + int ret = read_partial(con, size, size, &ceph_ts); + if (ret <= 0) + return ret; + ceph_decode_timespec(&con->last_keepalive_ack, &ceph_ts); + prepare_read_tag(con); + return 1; +} /* * Write something to the socket. Called in a worker thread when the @@ -2493,13 +2538,13 @@ more: more_kvec: /* kvec data queued? */ - if (con->out_skip) { - ret = write_partial_skip(con); + if (con->out_kvec_left) { + ret = write_partial_kvec(con); if (ret <= 0) goto out; } - if (con->out_kvec_left) { - ret = write_partial_kvec(con); + if (con->out_skip) { + ret = write_partial_skip(con); if (ret <= 0) goto out; } @@ -2526,6 +2571,10 @@ more_kvec: do_next: if (con->state == CON_STATE_OPEN) { + if (con_flag_test_and_clear(con, CON_FLAG_KEEPALIVE_PENDING)) { + prepare_write_keepalive(con); + goto more; + } /* is anything else pending? */ if (!list_empty(&con->out_queue)) { prepare_write_message(con); @@ -2535,10 +2584,6 @@ do_next: prepare_write_ack(con); goto more; } - if (con_flag_test_and_clear(con, CON_FLAG_KEEPALIVE_PENDING)) { - prepare_write_keepalive(con); - goto more; - } } /* Nothing to do! */ @@ -2641,6 +2686,9 @@ more: case CEPH_MSGR_TAG_ACK: prepare_read_ack(con); break; + case CEPH_MSGR_TAG_KEEPALIVE2_ACK: + prepare_read_keepalive_ack(con); + break; case CEPH_MSGR_TAG_CLOSE: con_close_socket(con); con->state = CON_STATE_CLOSED; @@ -2654,7 +2702,7 @@ more: if (ret <= 0) { switch (ret) { case -EBADMSG: - con->error_msg = "bad crc"; + con->error_msg = "bad crc/signature"; /* fall through */ case -EBADE: ret = -EIO; @@ -2684,6 +2732,12 @@ more: process_ack(con); goto more; } + if (con->in_tag == CEPH_MSGR_TAG_KEEPALIVE2_ACK) { + ret = read_keepalive_ack(con); + if (ret <= 0) + goto out; + goto more; + } out: dout("try_read done on %p ret %d\n", con, ret); @@ -2799,7 +2853,7 @@ static void con_fault_finish(struct ceph_connection *con) /* * Do some work on a connection. Drop a connection ref when we're done. */ -static void con_work(struct work_struct *work) +static void ceph_con_workfn(struct work_struct *work) { struct ceph_connection *con = container_of(work, struct ceph_connection, work.work); @@ -2889,10 +2943,8 @@ static void con_fault(struct ceph_connection *con) if (con->in_msg) { BUG_ON(con->in_msg->con != con); - con->in_msg->con = NULL; ceph_msg_put(con->in_msg); con->in_msg = NULL; - con->ops->put(con); } /* Requeue anything that hasn't been acked */ @@ -2923,15 +2975,8 @@ static void con_fault(struct ceph_connection *con) * initialize a new messenger instance */ void ceph_messenger_init(struct ceph_messenger *msgr, - struct ceph_entity_addr *myaddr, - u64 supported_features, - u64 required_features, - bool nocrc, - bool tcp_nodelay) + struct ceph_entity_addr *myaddr) { - msgr->supported_features = supported_features; - msgr->required_features = required_features; - spin_lock_init(&msgr->global_seq_lock); if (myaddr) @@ -2941,15 +2986,29 @@ void ceph_messenger_init(struct ceph_messenger *msgr, msgr->inst.addr.type = 0; get_random_bytes(&msgr->inst.addr.nonce, sizeof(msgr->inst.addr.nonce)); encode_my_addr(msgr); - msgr->nocrc = nocrc; - msgr->tcp_nodelay = tcp_nodelay; atomic_set(&msgr->stopping, 0); + write_pnet(&msgr->net, get_net(current->nsproxy->net_ns)); dout("%s %p\n", __func__, msgr); } EXPORT_SYMBOL(ceph_messenger_init); +void ceph_messenger_fini(struct ceph_messenger *msgr) +{ + put_net(read_pnet(&msgr->net)); +} +EXPORT_SYMBOL(ceph_messenger_fini); + +static void msg_con_set(struct ceph_msg *msg, struct ceph_connection *con) +{ + if (msg->con) + msg->con->ops->put(msg->con); + + msg->con = con ? con->ops->get(con) : NULL; + BUG_ON(msg->con != con); +} + static void clear_standby(struct ceph_connection *con) { /* come back from STANDBY? */ @@ -2981,9 +3040,7 @@ void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg) return; } - BUG_ON(msg->con != NULL); - msg->con = con->ops->get(con); - BUG_ON(msg->con == NULL); + msg_con_set(msg, con); BUG_ON(!list_empty(&msg->list_head)); list_add_tail(&msg->list_head, &con->out_queue); @@ -3011,31 +3068,45 @@ void ceph_msg_revoke(struct ceph_msg *msg) { struct ceph_connection *con = msg->con; - if (!con) + if (!con) { + dout("%s msg %p null con\n", __func__, msg); return; /* Message not in our possession */ + } mutex_lock(&con->mutex); if (!list_empty(&msg->list_head)) { dout("%s %p msg %p - was on queue\n", __func__, con, msg); list_del_init(&msg->list_head); - BUG_ON(msg->con == NULL); - msg->con->ops->put(msg->con); - msg->con = NULL; msg->hdr.seq = 0; ceph_msg_put(msg); } if (con->out_msg == msg) { - dout("%s %p msg %p - was sending\n", __func__, con, msg); - con->out_msg = NULL; - if (con->out_kvec_is_msg) { - con->out_skip = con->out_kvec_bytes; - con->out_kvec_is_msg = false; + BUG_ON(con->out_skip); + /* footer */ + if (con->out_msg_done) { + con->out_skip += con_out_kvec_skip(con); + } else { + BUG_ON(!msg->data_length); + if (con->peer_features & CEPH_FEATURE_MSG_AUTH) + con->out_skip += sizeof(msg->footer); + else + con->out_skip += sizeof(msg->old_footer); } + /* data, middle, front */ + if (msg->data_length) + con->out_skip += msg->cursor.total_resid; + if (msg->middle) + con->out_skip += con_out_kvec_skip(con); + con->out_skip += con_out_kvec_skip(con); + + dout("%s %p msg %p - was sending, will write %d skip %d\n", + __func__, con, msg, con->out_kvec_bytes, con->out_skip); msg->hdr.seq = 0; - + con->out_msg = NULL; ceph_msg_put(msg); } + mutex_unlock(&con->mutex); } @@ -3044,16 +3115,13 @@ void ceph_msg_revoke(struct ceph_msg *msg) */ void ceph_msg_revoke_incoming(struct ceph_msg *msg) { - struct ceph_connection *con; + struct ceph_connection *con = msg->con; - BUG_ON(msg == NULL); - if (!msg->con) { + if (!con) { dout("%s msg %p null con\n", __func__, msg); - return; /* Message not in our possession */ } - con = msg->con; mutex_lock(&con->mutex); if (con->in_msg == msg) { unsigned int front_len = le32_to_cpu(con->in_hdr.front_len); @@ -3094,6 +3162,20 @@ void ceph_con_keepalive(struct ceph_connection *con) } EXPORT_SYMBOL(ceph_con_keepalive); +bool ceph_con_keepalive_expired(struct ceph_connection *con, + unsigned long interval) +{ + if (interval > 0 && + (con->peer_features & CEPH_FEATURE_MSGR_KEEPALIVE2)) { + struct timespec now = CURRENT_TIME; + struct timespec ts; + jiffies_to_timespec(interval, &ts); + ts = timespec_add(con->last_keepalive_ack, ts); + return timespec_compare(&now, &ts) >= 0; + } + return false; +} + static struct ceph_msg_data *ceph_msg_data_create(enum ceph_msg_data_type type) { struct ceph_msg_data *data; @@ -3285,9 +3367,8 @@ static int ceph_con_in_msg_alloc(struct ceph_connection *con, int *skip) } if (msg) { BUG_ON(*skip); + msg_con_set(msg, con); con->in_msg = msg; - con->in_msg->con = con->ops->get(con); - BUG_ON(con->in_msg->con == NULL); } else { /* * Null message pointer means either we should skip @@ -3334,6 +3415,8 @@ static void ceph_msg_release(struct kref *kref) dout("%s %p\n", __func__, m); WARN_ON(!list_empty(&m->list_head)); + msg_con_set(m, NULL); + /* drop middle, data, if any */ if (m->middle) { ceph_buffer_put(m->middle); diff --git a/kernel/net/ceph/mon_client.c b/kernel/net/ceph/mon_client.c index 2b3cf05e8..edda01626 100644 --- a/kernel/net/ceph/mon_client.c +++ b/kernel/net/ceph/mon_client.c @@ -149,6 +149,10 @@ static int __open_session(struct ceph_mon_client *monc) CEPH_ENTITY_TYPE_MON, monc->cur_mon, &monc->monmap->mon_inst[monc->cur_mon].addr); + /* send an initial keepalive to ensure our timestamp is + * valid by the time we are in an OPENED state */ + ceph_con_keepalive(&monc->con); + /* initiatiate authentication handshake */ ret = ceph_auth_build_hello(monc->auth, monc->m_auth->front.iov_base, @@ -170,14 +174,19 @@ static bool __sub_expired(struct ceph_mon_client *monc) */ static void __schedule_delayed(struct ceph_mon_client *monc) { - unsigned int delay; + struct ceph_options *opt = monc->client->options; + unsigned long delay; - if (monc->cur_mon < 0 || __sub_expired(monc)) + if (monc->cur_mon < 0 || __sub_expired(monc)) { delay = 10 * HZ; - else + } else { delay = 20 * HZ; - dout("__schedule_delayed after %u\n", delay); - schedule_delayed_work(&monc->delayed_work, delay); + if (opt->monc_ping_timeout > 0) + delay = min(delay, opt->monc_ping_timeout / 3); + } + dout("__schedule_delayed after %lu\n", delay); + schedule_delayed_work(&monc->delayed_work, + round_jiffies_relative(delay)); } /* @@ -298,21 +307,28 @@ void ceph_monc_request_next_osdmap(struct ceph_mon_client *monc) } EXPORT_SYMBOL(ceph_monc_request_next_osdmap); +/* + * Wait for an osdmap with a given epoch. + * + * @epoch: epoch to wait for + * @timeout: in jiffies, 0 means "wait forever" + */ int ceph_monc_wait_osdmap(struct ceph_mon_client *monc, u32 epoch, unsigned long timeout) { unsigned long started = jiffies; - int ret; + long ret; mutex_lock(&monc->mutex); while (monc->have_osdmap < epoch) { mutex_unlock(&monc->mutex); - if (timeout != 0 && time_after_eq(jiffies, started + timeout)) + if (timeout && time_after_eq(jiffies, started + timeout)) return -ETIMEDOUT; ret = wait_event_interruptible_timeout(monc->client->auth_wq, - monc->have_osdmap >= epoch, timeout); + monc->have_osdmap >= epoch, + ceph_timeout_jiffies(timeout)); if (ret < 0) return ret; @@ -736,11 +752,23 @@ static void delayed_work(struct work_struct *work) __close_session(monc); __open_session(monc); /* continue hunting */ } else { - ceph_con_keepalive(&monc->con); + struct ceph_options *opt = monc->client->options; + int is_auth = ceph_auth_is_authenticated(monc->auth); + if (ceph_con_keepalive_expired(&monc->con, + opt->monc_ping_timeout)) { + dout("monc keepalive timeout\n"); + is_auth = 0; + __close_session(monc); + monc->hunting = true; + __open_session(monc); + } - __validate_auth(monc); + if (!monc->hunting) { + ceph_con_keepalive(&monc->con); + __validate_auth(monc); + } - if (ceph_auth_is_authenticated(monc->auth)) + if (is_auth) __send_subscribe(monc); } __schedule_delayed(monc); diff --git a/kernel/net/ceph/osd_client.c b/kernel/net/ceph/osd_client.c index c4ec92392..a28e47ff1 100644 --- a/kernel/net/ceph/osd_client.c +++ b/kernel/net/ceph/osd_client.c @@ -120,11 +120,13 @@ static void ceph_osd_data_bio_init(struct ceph_osd_data *osd_data, } #endif /* CONFIG_BLOCK */ -#define osd_req_op_data(oreq, whch, typ, fld) \ - ({ \ - BUG_ON(whch >= (oreq)->r_num_ops); \ - &(oreq)->r_ops[whch].typ.fld; \ - }) +#define osd_req_op_data(oreq, whch, typ, fld) \ +({ \ + struct ceph_osd_request *__oreq = (oreq); \ + unsigned int __whch = (whch); \ + BUG_ON(__whch >= __oreq->r_num_ops); \ + &__oreq->r_ops[__whch].typ.fld; \ +}) static struct ceph_osd_data * osd_req_op_raw_data_in(struct ceph_osd_request *osd_req, unsigned int which) @@ -285,6 +287,7 @@ static void osd_req_op_data_release(struct ceph_osd_request *osd_req, switch (op->op) { case CEPH_OSD_OP_READ: case CEPH_OSD_OP_WRITE: + case CEPH_OSD_OP_WRITEFULL: ceph_osd_data_release(&op->extent.osd_data); break; case CEPH_OSD_OP_CALL: @@ -296,6 +299,9 @@ static void osd_req_op_data_release(struct ceph_osd_request *osd_req, case CEPH_OSD_OP_CMPXATTR: ceph_osd_data_release(&op->xattr.osd_data); break; + case CEPH_OSD_OP_STAT: + ceph_osd_data_release(&op->raw_data_in); + break; default: break; } @@ -450,7 +456,7 @@ __CEPH_FORALL_OSD_OPS(GENERATE_CASE) */ static struct ceph_osd_req_op * _osd_req_op_init(struct ceph_osd_request *osd_req, unsigned int which, - u16 opcode) + u16 opcode, u32 flags) { struct ceph_osd_req_op *op; @@ -460,14 +466,15 @@ _osd_req_op_init(struct ceph_osd_request *osd_req, unsigned int which, op = &osd_req->r_ops[which]; memset(op, 0, sizeof (*op)); op->op = opcode; + op->flags = flags; return op; } void osd_req_op_init(struct ceph_osd_request *osd_req, - unsigned int which, u16 opcode) + unsigned int which, u16 opcode, u32 flags) { - (void)_osd_req_op_init(osd_req, which, opcode); + (void)_osd_req_op_init(osd_req, which, opcode, flags); } EXPORT_SYMBOL(osd_req_op_init); @@ -476,17 +483,19 @@ void osd_req_op_extent_init(struct ceph_osd_request *osd_req, u64 offset, u64 length, u64 truncate_size, u32 truncate_seq) { - struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, opcode); + struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, + opcode, 0); size_t payload_len = 0; BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE && - opcode != CEPH_OSD_OP_ZERO && opcode != CEPH_OSD_OP_TRUNCATE); + opcode != CEPH_OSD_OP_WRITEFULL && opcode != CEPH_OSD_OP_ZERO && + opcode != CEPH_OSD_OP_TRUNCATE); op->extent.offset = offset; op->extent.length = length; op->extent.truncate_size = truncate_size; op->extent.truncate_seq = truncate_seq; - if (opcode == CEPH_OSD_OP_WRITE) + if (opcode == CEPH_OSD_OP_WRITE || opcode == CEPH_OSD_OP_WRITEFULL) payload_len += length; op->payload_len = payload_len; @@ -515,7 +524,8 @@ EXPORT_SYMBOL(osd_req_op_extent_update); void osd_req_op_cls_init(struct ceph_osd_request *osd_req, unsigned int which, u16 opcode, const char *class, const char *method) { - struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, opcode); + struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, + opcode, 0); struct ceph_pagelist *pagelist; size_t payload_len = 0; size_t size; @@ -552,7 +562,8 @@ int osd_req_op_xattr_init(struct ceph_osd_request *osd_req, unsigned int which, u16 opcode, const char *name, const void *value, size_t size, u8 cmp_op, u8 cmp_mode) { - struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, opcode); + struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, + opcode, 0); struct ceph_pagelist *pagelist; size_t payload_len; @@ -585,7 +596,8 @@ void osd_req_op_watch_init(struct ceph_osd_request *osd_req, unsigned int which, u16 opcode, u64 cookie, u64 version, int flag) { - struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, opcode); + struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, + opcode, 0); BUG_ON(opcode != CEPH_OSD_OP_NOTIFY_ACK && opcode != CEPH_OSD_OP_WATCH); @@ -602,7 +614,8 @@ void osd_req_op_alloc_hint_init(struct ceph_osd_request *osd_req, u64 expected_write_size) { struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, - CEPH_OSD_OP_SETALLOCHINT); + CEPH_OSD_OP_SETALLOCHINT, + 0); op->alloc_hint.expected_object_size = expected_object_size; op->alloc_hint.expected_write_size = expected_write_size; @@ -661,9 +674,11 @@ static u64 osd_req_encode_op(struct ceph_osd_request *req, break; case CEPH_OSD_OP_READ: case CEPH_OSD_OP_WRITE: + case CEPH_OSD_OP_WRITEFULL: case CEPH_OSD_OP_ZERO: case CEPH_OSD_OP_TRUNCATE: - if (src->op == CEPH_OSD_OP_WRITE) + if (src->op == CEPH_OSD_OP_WRITE || + src->op == CEPH_OSD_OP_WRITEFULL) request_data_len = src->extent.length; dst->extent.offset = cpu_to_le64(src->extent.offset); dst->extent.length = cpu_to_le64(src->extent.length); @@ -672,7 +687,8 @@ static u64 osd_req_encode_op(struct ceph_osd_request *req, dst->extent.truncate_seq = cpu_to_le32(src->extent.truncate_seq); osd_data = &src->extent.osd_data; - if (src->op == CEPH_OSD_OP_WRITE) + if (src->op == CEPH_OSD_OP_WRITE || + src->op == CEPH_OSD_OP_WRITEFULL) ceph_osdc_msg_data_add(req->r_request, osd_data); else ceph_osdc_msg_data_add(req->r_reply, osd_data); @@ -786,7 +802,7 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, } if (opcode == CEPH_OSD_OP_CREATE || opcode == CEPH_OSD_OP_DELETE) { - osd_req_op_init(req, which, opcode); + osd_req_op_init(req, which, opcode, 0); } else { u32 object_size = le32_to_cpu(layout->fl_object_size); u32 object_base = off - objoff; @@ -1088,7 +1104,7 @@ static void __move_osd_to_lru(struct ceph_osd_client *osdc, BUG_ON(!list_empty(&osd->o_osd_lru)); list_add_tail(&osd->o_osd_lru, &osdc->osd_lru); - osd->lru_ttl = jiffies + osdc->client->options->osd_idle_ttl * HZ; + osd->lru_ttl = jiffies + osdc->client->options->osd_idle_ttl; } static void maybe_move_osd_to_lru(struct ceph_osd_client *osdc, @@ -1199,7 +1215,7 @@ static struct ceph_osd *__lookup_osd(struct ceph_osd_client *osdc, int o) static void __schedule_osd_timeout(struct ceph_osd_client *osdc) { schedule_delayed_work(&osdc->timeout_work, - osdc->client->options->osd_keepalive_timeout * HZ); + osdc->client->options->osd_keepalive_timeout); } static void __cancel_osd_timeout(struct ceph_osd_client *osdc) @@ -1567,10 +1583,9 @@ static void handle_timeout(struct work_struct *work) { struct ceph_osd_client *osdc = container_of(work, struct ceph_osd_client, timeout_work.work); + struct ceph_options *opts = osdc->client->options; struct ceph_osd_request *req; struct ceph_osd *osd; - unsigned long keepalive = - osdc->client->options->osd_keepalive_timeout * HZ; struct list_head slow_osds; dout("timeout\n"); down_read(&osdc->map_sem); @@ -1586,7 +1601,8 @@ static void handle_timeout(struct work_struct *work) */ INIT_LIST_HEAD(&slow_osds); list_for_each_entry(req, &osdc->req_lru, r_req_lru_item) { - if (time_before(jiffies, req->r_stamp + keepalive)) + if (time_before(jiffies, + req->r_stamp + opts->osd_keepalive_timeout)) break; osd = req->r_osd; @@ -1613,8 +1629,7 @@ static void handle_osds_timeout(struct work_struct *work) struct ceph_osd_client *osdc = container_of(work, struct ceph_osd_client, osds_timeout_work.work); - unsigned long delay = - osdc->client->options->osd_idle_ttl * HZ >> 2; + unsigned long delay = osdc->client->options->osd_idle_ttl / 4; dout("osds timeout\n"); down_read(&osdc->map_sem); @@ -1737,8 +1752,7 @@ static void complete_request(struct ceph_osd_request *req) * handle osd op reply. either call the callback if it is specified, * or do the completion to wake up the waiting thread. */ -static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg, - struct ceph_connection *con) +static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg) { void *p, *end; struct ceph_osd_request *req; @@ -2619,7 +2633,7 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client) osdc->event_count = 0; schedule_delayed_work(&osdc->osds_timeout_work, - round_jiffies_relative(osdc->client->options->osd_idle_ttl * HZ)); + round_jiffies_relative(osdc->client->options->osd_idle_ttl)); err = -ENOMEM; osdc->req_mempool = mempool_create_kmalloc_pool(10, @@ -2794,7 +2808,7 @@ static void dispatch(struct ceph_connection *con, struct ceph_msg *msg) ceph_osdc_handle_map(osdc, msg); break; case CEPH_MSG_OSD_OPREPLY: - handle_reply(osdc, msg, con); + handle_reply(osdc, msg); break; case CEPH_MSG_WATCH_NOTIFY: handle_watch_notify(osdc, msg); @@ -2809,8 +2823,9 @@ out: } /* - * lookup and return message for incoming reply. set up reply message - * pages. + * Lookup and return message for incoming reply. Don't try to do + * anything about a larger than preallocated data portion of the + * message at the moment - for now, just skip the message. */ static struct ceph_msg *get_reply(struct ceph_connection *con, struct ceph_msg_header *hdr, @@ -2828,23 +2843,19 @@ static struct ceph_msg *get_reply(struct ceph_connection *con, mutex_lock(&osdc->request_mutex); req = __lookup_request(osdc, tid); if (!req) { - *skip = 1; + dout("%s osd%d tid %llu unknown, skipping\n", __func__, + osd->o_osd, tid); m = NULL; - dout("get_reply unknown tid %llu from osd%d\n", tid, - osd->o_osd); + *skip = 1; goto out; } - if (req->r_reply->con) - dout("%s revoking msg %p from old con %p\n", __func__, - req->r_reply, req->r_reply->con); ceph_msg_revoke_incoming(req->r_reply); if (front_len > req->r_reply->front_alloc_len) { - pr_warn("get_reply front %d > preallocated %d (%u#%llu)\n", - front_len, req->r_reply->front_alloc_len, - (unsigned int)con->peer_name.type, - le64_to_cpu(con->peer_name.num)); + pr_warn("%s osd%d tid %llu front %d > preallocated %d\n", + __func__, osd->o_osd, req->r_tid, front_len, + req->r_reply->front_alloc_len); m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front_len, GFP_NOFS, false); if (!m) @@ -2852,37 +2863,22 @@ static struct ceph_msg *get_reply(struct ceph_connection *con, ceph_msg_put(req->r_reply); req->r_reply = m; } - m = ceph_msg_get(req->r_reply); - - if (data_len > 0) { - struct ceph_osd_data *osd_data; - /* - * XXX This is assuming there is only one op containing - * XXX page data. Probably OK for reads, but this - * XXX ought to be done more generally. - */ - osd_data = osd_req_op_extent_osd_data(req, 0); - if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES) { - if (osd_data->pages && - unlikely(osd_data->length < data_len)) { - - pr_warn("tid %lld reply has %d bytes we had only %llu bytes ready\n", - tid, data_len, osd_data->length); - *skip = 1; - ceph_msg_put(m); - m = NULL; - goto out; - } - } + if (data_len > req->r_reply->data_length) { + pr_warn("%s osd%d tid %llu data %d > preallocated %zu, skipping\n", + __func__, osd->o_osd, req->r_tid, data_len, + req->r_reply->data_length); + m = NULL; + *skip = 1; + goto out; } - *skip = 0; + + m = ceph_msg_get(req->r_reply); dout("get_reply tid %lld %p\n", tid, m); out: mutex_unlock(&osdc->request_mutex); return m; - } static struct ceph_msg *alloc_msg(struct ceph_connection *con, @@ -2980,17 +2976,19 @@ static int invalidate_authorizer(struct ceph_connection *con) return ceph_monc_validate_auth(&osdc->client->monc); } -static int sign_message(struct ceph_connection *con, struct ceph_msg *msg) +static int osd_sign_message(struct ceph_msg *msg) { - struct ceph_osd *o = con->private; + struct ceph_osd *o = msg->con->private; struct ceph_auth_handshake *auth = &o->o_auth; + return ceph_auth_sign_message(auth, msg); } -static int check_message_signature(struct ceph_connection *con, struct ceph_msg *msg) +static int osd_check_message_signature(struct ceph_msg *msg) { - struct ceph_osd *o = con->private; + struct ceph_osd *o = msg->con->private; struct ceph_auth_handshake *auth = &o->o_auth; + return ceph_auth_check_message_signature(auth, msg); } @@ -3002,7 +3000,7 @@ static const struct ceph_connection_operations osd_con_ops = { .verify_authorizer_reply = verify_authorizer_reply, .invalidate_authorizer = invalidate_authorizer, .alloc_msg = alloc_msg, - .sign_message = sign_message, - .check_message_signature = check_message_signature, + .sign_message = osd_sign_message, + .check_message_signature = osd_check_message_signature, .fault = osd_reset, }; diff --git a/kernel/net/ceph/osdmap.c b/kernel/net/ceph/osdmap.c index 4a3125836..7d8f581d9 100644 --- a/kernel/net/ceph/osdmap.c +++ b/kernel/net/ceph/osdmap.c @@ -1300,7 +1300,7 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end, ceph_decode_addr(&addr); pr_info("osd%d up\n", osd); BUG_ON(osd >= map->max_osd); - map->osd_state[osd] |= CEPH_OSD_UP; + map->osd_state[osd] |= CEPH_OSD_UP | CEPH_OSD_EXISTS; map->osd_addr[osd] = addr; } diff --git a/kernel/net/ceph/pagevec.c b/kernel/net/ceph/pagevec.c index 096d91447..d4f5f220a 100644 --- a/kernel/net/ceph/pagevec.c +++ b/kernel/net/ceph/pagevec.c @@ -51,10 +51,7 @@ void ceph_put_page_vector(struct page **pages, int num_pages, bool dirty) set_page_dirty_lock(pages[i]); put_page(pages[i]); } - if (is_vmalloc_addr(pages)) - vfree(pages); - else - kfree(pages); + kvfree(pages); } EXPORT_SYMBOL(ceph_put_page_vector); -- cgit 1.2.3-korg