summaryrefslogtreecommitdiffstats
path: root/kernel/net/ceph
diff options
context:
space:
mode:
authorJosé Pekkarinen <jose.pekkarinen@nokia.com>2016-04-11 10:41:07 +0300
committerJosé Pekkarinen <jose.pekkarinen@nokia.com>2016-04-13 08:17:18 +0300
commite09b41010ba33a20a87472ee821fa407a5b8da36 (patch)
treed10dc367189862e7ca5c592f033dc3726e1df4e3 /kernel/net/ceph
parentf93b97fd65072de626c074dbe099a1fff05ce060 (diff)
These changes are the raw update to linux-4.4.6-rt14. Kernel sources
are taken from kernel.org, and rt patch from the rt wiki download page. During the rebasing, the following patch collided: Force tick interrupt and get rid of softirq magic(I70131fb85). Collisions have been removed because its logic was found on the source already. Change-Id: I7f57a4081d9deaa0d9ccfc41a6c8daccdee3b769 Signed-off-by: José Pekkarinen <jose.pekkarinen@nokia.com>
Diffstat (limited to 'kernel/net/ceph')
-rw-r--r--kernel/net/ceph/auth_x.c36
-rw-r--r--kernel/net/ceph/ceph_common.c87
-rw-r--r--kernel/net/ceph/crush/crush.c13
-rw-r--r--kernel/net/ceph/crush/crush_ln_table.h32
-rw-r--r--kernel/net/ceph/crush/hash.c8
-rw-r--r--kernel/net/ceph/crush/mapper.c148
-rw-r--r--kernel/net/ceph/crypto.c10
-rw-r--r--kernel/net/ceph/crypto.h4
-rw-r--r--kernel/net/ceph/messenger.c291
-rw-r--r--kernel/net/ceph/mon_client.c50
-rw-r--r--kernel/net/ceph/osd_client.c140
-rw-r--r--kernel/net/ceph/osdmap.c2
-rw-r--r--kernel/net/ceph/pagevec.c5
13 files changed, 510 insertions, 316 deletions
diff --git a/kernel/net/ceph/auth_x.c b/kernel/net/ceph/auth_x.c
index ba6eb1722..10d87753e 100644
--- a/kernel/net/ceph/auth_x.c
+++ b/kernel/net/ceph/auth_x.c
@@ -8,6 +8,7 @@
#include <linux/ceph/decode.h>
#include <linux/ceph/auth.h>
+#include <linux/ceph/libceph.h>
#include <linux/ceph/messenger.h>
#include "crypto.h"
@@ -279,6 +280,15 @@ bad:
return -EINVAL;
}
+static void ceph_x_authorizer_cleanup(struct ceph_x_authorizer *au)
+{
+ ceph_crypto_key_destroy(&au->session_key);
+ if (au->buf) {
+ ceph_buffer_put(au->buf);
+ au->buf = NULL;
+ }
+}
+
static int ceph_x_build_authorizer(struct ceph_auth_client *ac,
struct ceph_x_ticket_handler *th,
struct ceph_x_authorizer *au)
@@ -297,7 +307,7 @@ static int ceph_x_build_authorizer(struct ceph_auth_client *ac,
ceph_crypto_key_destroy(&au->session_key);
ret = ceph_crypto_key_clone(&au->session_key, &th->session_key);
if (ret)
- return ret;
+ goto out_au;
maxlen = sizeof(*msg_a) + sizeof(msg_b) +
ceph_x_encrypt_buflen(ticket_blob_len);
@@ -309,8 +319,8 @@ static int ceph_x_build_authorizer(struct ceph_auth_client *ac,
if (!au->buf) {
au->buf = ceph_buffer_new(maxlen, GFP_NOFS);
if (!au->buf) {
- ceph_crypto_key_destroy(&au->session_key);
- return -ENOMEM;
+ ret = -ENOMEM;
+ goto out_au;
}
}
au->service = th->service;
@@ -340,7 +350,7 @@ static int ceph_x_build_authorizer(struct ceph_auth_client *ac,
ret = ceph_x_encrypt(&au->session_key, &msg_b, sizeof(msg_b),
p, end - p);
if (ret < 0)
- goto out_buf;
+ goto out_au;
p += ret;
au->buf->vec.iov_len = p - au->buf->vec.iov_base;
dout(" built authorizer nonce %llx len %d\n", au->nonce,
@@ -348,9 +358,8 @@ static int ceph_x_build_authorizer(struct ceph_auth_client *ac,
BUG_ON(au->buf->vec.iov_len > maxlen);
return 0;
-out_buf:
- ceph_buffer_put(au->buf);
- au->buf = NULL;
+out_au:
+ ceph_x_authorizer_cleanup(au);
return ret;
}
@@ -624,8 +633,7 @@ static void ceph_x_destroy_authorizer(struct ceph_auth_client *ac,
{
struct ceph_x_authorizer *au = (void *)a;
- ceph_crypto_key_destroy(&au->session_key);
- ceph_buffer_put(au->buf);
+ ceph_x_authorizer_cleanup(au);
kfree(au);
}
@@ -653,8 +661,7 @@ static void ceph_x_destroy(struct ceph_auth_client *ac)
remove_ticket_handler(ac, th);
}
- if (xi->auth_authorizer.buf)
- ceph_buffer_put(xi->auth_authorizer.buf);
+ ceph_x_authorizer_cleanup(&xi->auth_authorizer);
kfree(ac->private);
ac->private = NULL;
@@ -691,8 +698,10 @@ static int ceph_x_sign_message(struct ceph_auth_handshake *auth,
struct ceph_msg *msg)
{
int ret;
- if (!auth->authorizer)
+
+ if (ceph_test_opt(from_msgr(msg->con->msgr), NOMSGSIGN))
return 0;
+
ret = calcu_signature((struct ceph_x_authorizer *)auth->authorizer,
msg, &msg->footer.sig);
if (ret < 0)
@@ -707,8 +716,9 @@ static int ceph_x_check_message_signature(struct ceph_auth_handshake *auth,
__le64 sig_check;
int ret;
- if (!auth->authorizer)
+ if (ceph_test_opt(from_msgr(msg->con->msgr), NOMSGSIGN))
return 0;
+
ret = calcu_signature((struct ceph_x_authorizer *)auth->authorizer,
msg, &sig_check);
if (ret < 0)
diff --git a/kernel/net/ceph/ceph_common.c b/kernel/net/ceph/ceph_common.c
index 3f76eb84b..bcbec33c6 100644
--- a/kernel/net/ceph/ceph_common.c
+++ b/kernel/net/ceph/ceph_common.c
@@ -9,6 +9,7 @@
#include <keys/ceph-type.h>
#include <linux/module.h>
#include <linux/mount.h>
+#include <linux/nsproxy.h>
#include <linux/parser.h>
#include <linux/sched.h>
#include <linux/seq_file.h>
@@ -16,8 +17,6 @@
#include <linux/statfs.h>
#include <linux/string.h>
#include <linux/vmalloc.h>
-#include <linux/nsproxy.h>
-#include <net/net_namespace.h>
#include <linux/ceph/ceph_features.h>
@@ -131,6 +130,13 @@ int ceph_compare_options(struct ceph_options *new_opt,
int i;
int ret;
+ /*
+ * Don't bother comparing options if network namespaces don't
+ * match.
+ */
+ if (!net_eq(current->nsproxy->net_ns, read_pnet(&client->msgr.net)))
+ return -1;
+
ret = memcmp(opt1, opt2, ofs);
if (ret)
return ret;
@@ -239,6 +245,8 @@ enum {
Opt_nocrc,
Opt_cephx_require_signatures,
Opt_nocephx_require_signatures,
+ Opt_cephx_sign_messages,
+ Opt_nocephx_sign_messages,
Opt_tcp_nodelay,
Opt_notcp_nodelay,
};
@@ -261,6 +269,8 @@ static match_table_t opt_tokens = {
{Opt_nocrc, "nocrc"},
{Opt_cephx_require_signatures, "cephx_require_signatures"},
{Opt_nocephx_require_signatures, "nocephx_require_signatures"},
+ {Opt_cephx_sign_messages, "cephx_sign_messages"},
+ {Opt_nocephx_sign_messages, "nocephx_sign_messages"},
{Opt_tcp_nodelay, "tcp_nodelay"},
{Opt_notcp_nodelay, "notcp_nodelay"},
{-1, NULL}
@@ -312,7 +322,7 @@ static int get_secret(struct ceph_crypto_key *dst, const char *name) {
goto out;
}
- ckey = ukey->payload.data;
+ ckey = ukey->payload.data[0];
err = ceph_crypto_key_clone(dst, ckey);
if (err)
goto out_key;
@@ -335,9 +345,6 @@ ceph_parse_options(char *options, const char *dev_name,
int err = -ENOMEM;
substring_t argstr[MAX_OPT_ARGS];
- if (current->nsproxy->net_ns != &init_net)
- return ERR_PTR(-EINVAL);
-
opt = kzalloc(sizeof(*opt), GFP_KERNEL);
if (!opt)
return ERR_PTR(-ENOMEM);
@@ -352,8 +359,9 @@ ceph_parse_options(char *options, const char *dev_name,
/* start with defaults */
opt->flags = CEPH_OPT_DEFAULT;
opt->osd_keepalive_timeout = CEPH_OSD_KEEPALIVE_DEFAULT;
- opt->mount_timeout = CEPH_MOUNT_TIMEOUT_DEFAULT; /* seconds */
- opt->osd_idle_ttl = CEPH_OSD_IDLE_TTL_DEFAULT; /* seconds */
+ opt->mount_timeout = CEPH_MOUNT_TIMEOUT_DEFAULT;
+ opt->osd_idle_ttl = CEPH_OSD_IDLE_TTL_DEFAULT;
+ opt->monc_ping_timeout = CEPH_MONC_PING_TIMEOUT_DEFAULT;
/* get mon ip(s) */
/* ip1[:port1][,ip2[:port2]...] */
@@ -439,13 +447,32 @@ ceph_parse_options(char *options, const char *dev_name,
pr_warn("ignoring deprecated osdtimeout option\n");
break;
case Opt_osdkeepalivetimeout:
- opt->osd_keepalive_timeout = intval;
+ /* 0 isn't well defined right now, reject it */
+ if (intval < 1 || intval > INT_MAX / 1000) {
+ pr_err("osdkeepalive out of range\n");
+ err = -EINVAL;
+ goto out;
+ }
+ opt->osd_keepalive_timeout =
+ msecs_to_jiffies(intval * 1000);
break;
case Opt_osd_idle_ttl:
- opt->osd_idle_ttl = intval;
+ /* 0 isn't well defined right now, reject it */
+ if (intval < 1 || intval > INT_MAX / 1000) {
+ pr_err("osd_idle_ttl out of range\n");
+ err = -EINVAL;
+ goto out;
+ }
+ opt->osd_idle_ttl = msecs_to_jiffies(intval * 1000);
break;
case Opt_mount_timeout:
- opt->mount_timeout = intval;
+ /* 0 is "wait forever" (i.e. infinite timeout) */
+ if (intval < 0 || intval > INT_MAX / 1000) {
+ pr_err("mount_timeout out of range\n");
+ err = -EINVAL;
+ goto out;
+ }
+ opt->mount_timeout = msecs_to_jiffies(intval * 1000);
break;
case Opt_share:
@@ -468,6 +495,12 @@ ceph_parse_options(char *options, const char *dev_name,
case Opt_nocephx_require_signatures:
opt->flags |= CEPH_OPT_NOMSGAUTH;
break;
+ case Opt_cephx_sign_messages:
+ opt->flags &= ~CEPH_OPT_NOMSGSIGN;
+ break;
+ case Opt_nocephx_sign_messages:
+ opt->flags |= CEPH_OPT_NOMSGSIGN;
+ break;
case Opt_tcp_nodelay:
opt->flags |= CEPH_OPT_TCP_NODELAY;
@@ -511,16 +544,20 @@ int ceph_print_client_options(struct seq_file *m, struct ceph_client *client)
seq_puts(m, "nocrc,");
if (opt->flags & CEPH_OPT_NOMSGAUTH)
seq_puts(m, "nocephx_require_signatures,");
+ if (opt->flags & CEPH_OPT_NOMSGSIGN)
+ seq_puts(m, "nocephx_sign_messages,");
if ((opt->flags & CEPH_OPT_TCP_NODELAY) == 0)
seq_puts(m, "notcp_nodelay,");
if (opt->mount_timeout != CEPH_MOUNT_TIMEOUT_DEFAULT)
- seq_printf(m, "mount_timeout=%d,", opt->mount_timeout);
+ seq_printf(m, "mount_timeout=%d,",
+ jiffies_to_msecs(opt->mount_timeout) / 1000);
if (opt->osd_idle_ttl != CEPH_OSD_IDLE_TTL_DEFAULT)
- seq_printf(m, "osd_idle_ttl=%d,", opt->osd_idle_ttl);
+ seq_printf(m, "osd_idle_ttl=%d,",
+ jiffies_to_msecs(opt->osd_idle_ttl) / 1000);
if (opt->osd_keepalive_timeout != CEPH_OSD_KEEPALIVE_DEFAULT)
seq_printf(m, "osdkeepalivetimeout=%d,",
- opt->osd_keepalive_timeout);
+ jiffies_to_msecs(opt->osd_keepalive_timeout) / 1000);
/* drop redundant comma */
if (m->count != pos)
@@ -571,11 +608,7 @@ struct ceph_client *ceph_create_client(struct ceph_options *opt, void *private,
if (ceph_test_opt(client, MYIP))
myaddr = &client->options->my_addr;
- ceph_messenger_init(&client->msgr, myaddr,
- client->supported_features,
- client->required_features,
- ceph_test_opt(client, NOCRC),
- ceph_test_opt(client, TCP_NODELAY));
+ ceph_messenger_init(&client->msgr, myaddr);
/* subsystems */
err = ceph_monc_init(&client->monc, client);
@@ -590,6 +623,7 @@ struct ceph_client *ceph_create_client(struct ceph_options *opt, void *private,
fail_monc:
ceph_monc_stop(&client->monc);
fail:
+ ceph_messenger_fini(&client->msgr);
kfree(client);
return ERR_PTR(err);
}
@@ -603,8 +637,8 @@ void ceph_destroy_client(struct ceph_client *client)
/* unmount */
ceph_osdc_stop(&client->osdc);
-
ceph_monc_stop(&client->monc);
+ ceph_messenger_fini(&client->msgr);
ceph_debugfs_client_cleanup(client);
@@ -629,8 +663,8 @@ static int have_mon_and_osd_map(struct ceph_client *client)
*/
int __ceph_open_session(struct ceph_client *client, unsigned long started)
{
- int err;
- unsigned long timeout = client->options->mount_timeout * HZ;
+ unsigned long timeout = client->options->mount_timeout;
+ long err;
/* open session, and wait for mon and osd maps */
err = ceph_monc_open_session(&client->monc);
@@ -638,16 +672,15 @@ int __ceph_open_session(struct ceph_client *client, unsigned long started)
return err;
while (!have_mon_and_osd_map(client)) {
- err = -EIO;
if (timeout && time_after_eq(jiffies, started + timeout))
- return err;
+ return -ETIMEDOUT;
/* wait */
dout("mount waiting for mon_map\n");
err = wait_event_interruptible_timeout(client->auth_wq,
have_mon_and_osd_map(client) || (client->auth_err < 0),
- timeout);
- if (err == -EINTR || err == -ERESTARTSYS)
+ ceph_timeout_jiffies(timeout));
+ if (err < 0)
return err;
if (client->auth_err < 0)
return client->auth_err;
@@ -724,5 +757,5 @@ module_exit(exit_ceph_lib);
MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
MODULE_AUTHOR("Patience Warnick <patience@newdream.net>");
-MODULE_DESCRIPTION("Ceph filesystem for Linux");
+MODULE_DESCRIPTION("Ceph core library");
MODULE_LICENSE("GPL");
diff --git a/kernel/net/ceph/crush/crush.c b/kernel/net/ceph/crush/crush.c
index 9d84ce4ea..80d7c3a97 100644
--- a/kernel/net/ceph/crush/crush.c
+++ b/kernel/net/ceph/crush/crush.c
@@ -1,15 +1,11 @@
-
#ifdef __KERNEL__
# include <linux/slab.h>
+# include <linux/crush/crush.h>
#else
-# include <stdlib.h>
-# include <assert.h>
-# define kfree(x) do { if (x) free(x); } while (0)
-# define BUG_ON(x) assert(!(x))
+# include "crush_compat.h"
+# include "crush.h"
#endif
-#include <linux/crush/crush.h>
-
const char *crush_bucket_alg_name(int alg)
{
switch (alg) {
@@ -134,6 +130,9 @@ void crush_destroy(struct crush_map *map)
kfree(map->rules);
}
+#ifndef __KERNEL__
+ kfree(map->choose_tries);
+#endif
kfree(map);
}
diff --git a/kernel/net/ceph/crush/crush_ln_table.h b/kernel/net/ceph/crush/crush_ln_table.h
index 6192c7fc9..aae534c90 100644
--- a/kernel/net/ceph/crush/crush_ln_table.h
+++ b/kernel/net/ceph/crush/crush_ln_table.h
@@ -10,20 +10,20 @@
*
*/
-#if defined(__linux__)
-#include <linux/types.h>
-#elif defined(__FreeBSD__)
-#include <sys/types.h>
-#endif
-
#ifndef CEPH_CRUSH_LN_H
#define CEPH_CRUSH_LN_H
+#ifdef __KERNEL__
+# include <linux/types.h>
+#else
+# include "crush_compat.h"
+#endif
-// RH_LH_tbl[2*k] = 2^48/(1.0+k/128.0)
-// RH_LH_tbl[2*k+1] = 2^48*log2(1.0+k/128.0)
-
-static int64_t __RH_LH_tbl[128*2+2] = {
+/*
+ * RH_LH_tbl[2*k] = 2^48/(1.0+k/128.0)
+ * RH_LH_tbl[2*k+1] = 2^48*log2(1.0+k/128.0)
+ */
+static __s64 __RH_LH_tbl[128*2+2] = {
0x0001000000000000ll, 0x0000000000000000ll, 0x0000fe03f80fe040ll, 0x000002dfca16dde1ll,
0x0000fc0fc0fc0fc1ll, 0x000005b9e5a170b4ll, 0x0000fa232cf25214ll, 0x0000088e68ea899all,
0x0000f83e0f83e0f9ll, 0x00000b5d69bac77ell, 0x0000f6603d980f67ll, 0x00000e26fd5c8555ll,
@@ -89,11 +89,12 @@ static int64_t __RH_LH_tbl[128*2+2] = {
0x0000820820820821ll, 0x0000fa2f045e7832ll, 0x000081848da8faf1ll, 0x0000fba577877d7dll,
0x0000810204081021ll, 0x0000fd1a708bbe11ll, 0x0000808080808081ll, 0x0000fe8df263f957ll,
0x0000800000000000ll, 0x0000ffff00000000ll,
- };
-
+};
- // LL_tbl[k] = 2^48*log2(1.0+k/2^15);
-static int64_t __LL_tbl[256] = {
+/*
+ * LL_tbl[k] = 2^48*log2(1.0+k/2^15)
+ */
+static __s64 __LL_tbl[256] = {
0x0000000000000000ull, 0x00000002e2a60a00ull, 0x000000070cb64ec5ull, 0x00000009ef50ce67ull,
0x0000000cd1e588fdull, 0x0000000fb4747e9cull, 0x0000001296fdaf5eull, 0x0000001579811b58ull,
0x000000185bfec2a1ull, 0x0000001b3e76a552ull, 0x0000001e20e8c380ull, 0x0000002103551d43ull,
@@ -160,7 +161,4 @@ static int64_t __LL_tbl[256] = {
0x000002d4562d2ec6ull, 0x000002d73330209dull, 0x000002da102d63b0ull, 0x000002dced24f814ull,
};
-
-
-
#endif
diff --git a/kernel/net/ceph/crush/hash.c b/kernel/net/ceph/crush/hash.c
index 5bb63e37a..ed123af49 100644
--- a/kernel/net/ceph/crush/hash.c
+++ b/kernel/net/ceph/crush/hash.c
@@ -1,6 +1,8 @@
-
-#include <linux/types.h>
-#include <linux/crush/hash.h>
+#ifdef __KERNEL__
+# include <linux/crush/hash.h>
+#else
+# include "hash.h"
+#endif
/*
* Robert Jenkins' function for mixing 32-bit values
diff --git a/kernel/net/ceph/crush/mapper.c b/kernel/net/ceph/crush/mapper.c
index 5b47736d2..393bfb22d 100644
--- a/kernel/net/ceph/crush/mapper.c
+++ b/kernel/net/ceph/crush/mapper.c
@@ -1,27 +1,31 @@
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2015 Intel Corporation All Rights Reserved
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
#ifdef __KERNEL__
# include <linux/string.h>
# include <linux/slab.h>
# include <linux/bug.h>
# include <linux/kernel.h>
-# ifndef dprintk
-# define dprintk(args...)
-# endif
+# include <linux/crush/crush.h>
+# include <linux/crush/hash.h>
#else
-# include <string.h>
-# include <stdio.h>
-# include <stdlib.h>
-# include <assert.h>
-# define BUG_ON(x) assert(!(x))
-# define dprintk(args...) /* printf(args) */
-# define kmalloc(x, f) malloc(x)
-# define kfree(x) free(x)
+# include "crush_compat.h"
+# include "crush.h"
+# include "hash.h"
#endif
-
-#include <linux/crush/crush.h>
-#include <linux/crush/hash.h>
#include "crush_ln_table.h"
+#define dprintk(args...) /* printf(args) */
+
/*
* Implement the core CRUSH mapping algorithm.
*/
@@ -139,7 +143,7 @@ static int bucket_list_choose(struct crush_bucket_list *bucket,
int i;
for (i = bucket->h.size-1; i >= 0; i--) {
- __u64 w = crush_hash32_4(bucket->h.hash,x, bucket->h.items[i],
+ __u64 w = crush_hash32_4(bucket->h.hash, x, bucket->h.items[i],
r, bucket->h.id);
w &= 0xffff;
dprintk("list_choose i=%d x=%d r=%d item %d weight %x "
@@ -238,43 +242,46 @@ static int bucket_straw_choose(struct crush_bucket_straw *bucket,
return bucket->h.items[high];
}
-// compute 2^44*log2(input+1)
-uint64_t crush_ln(unsigned xin)
+/* compute 2^44*log2(input+1) */
+static __u64 crush_ln(unsigned int xin)
{
- unsigned x=xin, x1;
- int iexpon, index1, index2;
- uint64_t RH, LH, LL, xl64, result;
+ unsigned int x = xin, x1;
+ int iexpon, index1, index2;
+ __u64 RH, LH, LL, xl64, result;
- x++;
+ x++;
- // normalize input
- iexpon = 15;
- while(!(x&0x18000)) { x<<=1; iexpon--; }
+ /* normalize input */
+ iexpon = 15;
+ while (!(x & 0x18000)) {
+ x <<= 1;
+ iexpon--;
+ }
- index1 = (x>>8)<<1;
- // RH ~ 2^56/index1
- RH = __RH_LH_tbl[index1 - 256];
- // LH ~ 2^48 * log2(index1/256)
- LH = __RH_LH_tbl[index1 + 1 - 256];
+ index1 = (x >> 8) << 1;
+ /* RH ~ 2^56/index1 */
+ RH = __RH_LH_tbl[index1 - 256];
+ /* LH ~ 2^48 * log2(index1/256) */
+ LH = __RH_LH_tbl[index1 + 1 - 256];
- // RH*x ~ 2^48 * (2^15 + xf), xf<2^8
- xl64 = (int64_t)x * RH;
- xl64 >>= 48;
- x1 = xl64;
+ /* RH*x ~ 2^48 * (2^15 + xf), xf<2^8 */
+ xl64 = (__s64)x * RH;
+ xl64 >>= 48;
+ x1 = xl64;
- result = iexpon;
- result <<= (12 + 32);
+ result = iexpon;
+ result <<= (12 + 32);
- index2 = x1 & 0xff;
- // LL ~ 2^48*log2(1.0+index2/2^15)
- LL = __LL_tbl[index2];
+ index2 = x1 & 0xff;
+ /* LL ~ 2^48*log2(1.0+index2/2^15) */
+ LL = __LL_tbl[index2];
- LH = LH + LL;
+ LH = LH + LL;
- LH >>= (48-12 - 32);
- result += LH;
+ LH >>= (48 - 12 - 32);
+ result += LH;
- return result;
+ return result;
}
@@ -290,9 +297,9 @@ uint64_t crush_ln(unsigned xin)
static int bucket_straw2_choose(struct crush_bucket_straw2 *bucket,
int x, int r)
{
- unsigned i, high = 0;
- unsigned u;
- unsigned w;
+ unsigned int i, high = 0;
+ unsigned int u;
+ unsigned int w;
__s64 ln, draw, high_draw = 0;
for (i = 0; i < bucket->h.size; i++) {
@@ -567,6 +574,10 @@ reject:
out[outpos] = item;
outpos++;
count--;
+#ifndef __KERNEL__
+ if (map->choose_tries && ftotal <= map->choose_total_tries)
+ map->choose_tries[ftotal]++;
+#endif
}
dprintk("CHOOSE returns %d\n", outpos);
@@ -610,6 +621,20 @@ static void crush_choose_indep(const struct crush_map *map,
}
for (ftotal = 0; left > 0 && ftotal < tries; ftotal++) {
+#ifdef DEBUG_INDEP
+ if (out2 && ftotal) {
+ dprintk("%u %d a: ", ftotal, left);
+ for (rep = outpos; rep < endpos; rep++) {
+ dprintk(" %d", out[rep]);
+ }
+ dprintk("\n");
+ dprintk("%u %d b: ", ftotal, left);
+ for (rep = outpos; rep < endpos; rep++) {
+ dprintk(" %d", out2[rep]);
+ }
+ dprintk("\n");
+ }
+#endif
for (rep = outpos; rep < endpos; rep++) {
if (out[rep] != CRUSH_ITEM_UNDEF)
continue;
@@ -726,6 +751,24 @@ static void crush_choose_indep(const struct crush_map *map,
out2[rep] = CRUSH_ITEM_NONE;
}
}
+#ifndef __KERNEL__
+ if (map->choose_tries && ftotal <= map->choose_total_tries)
+ map->choose_tries[ftotal]++;
+#endif
+#ifdef DEBUG_INDEP
+ if (out2) {
+ dprintk("%u %d a: ", ftotal, left);
+ for (rep = outpos; rep < endpos; rep++) {
+ dprintk(" %d", out[rep]);
+ }
+ dprintk("\n");
+ dprintk("%u %d b: ", ftotal, left);
+ for (rep = outpos; rep < endpos; rep++) {
+ dprintk(" %d", out2[rep]);
+ }
+ dprintk("\n");
+ }
+#endif
}
/**
@@ -790,8 +833,15 @@ int crush_do_rule(const struct crush_map *map,
switch (curstep->op) {
case CRUSH_RULE_TAKE:
- w[0] = curstep->arg1;
- wsize = 1;
+ if ((curstep->arg1 >= 0 &&
+ curstep->arg1 < map->max_devices) ||
+ (-1-curstep->arg1 < map->max_buckets &&
+ map->buckets[-1-curstep->arg1])) {
+ w[0] = curstep->arg1;
+ wsize = 1;
+ } else {
+ dprintk(" bad take value %d\n", curstep->arg1);
+ }
break;
case CRUSH_RULE_SET_CHOOSE_TRIES:
@@ -877,7 +927,7 @@ int crush_do_rule(const struct crush_map *map,
0);
} else {
out_size = ((numrep < (result_max-osize)) ?
- numrep : (result_max-osize));
+ numrep : (result_max-osize));
crush_choose_indep(
map,
map->buckets[-1-w[i]],
@@ -923,5 +973,3 @@ int crush_do_rule(const struct crush_map *map,
}
return result_len;
}
-
-
diff --git a/kernel/net/ceph/crypto.c b/kernel/net/ceph/crypto.c
index 790fe89d9..42e8649c6 100644
--- a/kernel/net/ceph/crypto.c
+++ b/kernel/net/ceph/crypto.c
@@ -79,10 +79,6 @@ int ceph_crypto_key_unarmor(struct ceph_crypto_key *key, const char *inkey)
return 0;
}
-
-
-#define AES_KEY_SIZE 16
-
static struct crypto_blkcipher *ceph_crypto_alloc_cipher(void)
{
return crypto_alloc_blkcipher("cbc(aes)", 0, CRYPTO_ALG_ASYNC);
@@ -541,7 +537,7 @@ static int ceph_key_preparse(struct key_preparsed_payload *prep)
if (ret < 0)
goto err_ckey;
- prep->payload[0] = ckey;
+ prep->payload.data[0] = ckey;
prep->quotalen = datalen;
return 0;
@@ -553,14 +549,14 @@ err:
static void ceph_key_free_preparse(struct key_preparsed_payload *prep)
{
- struct ceph_crypto_key *ckey = prep->payload[0];
+ struct ceph_crypto_key *ckey = prep->payload.data[0];
ceph_crypto_key_destroy(ckey);
kfree(ckey);
}
static void ceph_key_destroy(struct key *key)
{
- struct ceph_crypto_key *ckey = key->payload.data;
+ struct ceph_crypto_key *ckey = key->payload.data[0];
ceph_crypto_key_destroy(ckey);
kfree(ckey);
diff --git a/kernel/net/ceph/crypto.h b/kernel/net/ceph/crypto.h
index d1498224c..2e9cab09f 100644
--- a/kernel/net/ceph/crypto.h
+++ b/kernel/net/ceph/crypto.h
@@ -16,8 +16,10 @@ struct ceph_crypto_key {
static inline void ceph_crypto_key_destroy(struct ceph_crypto_key *key)
{
- if (key)
+ if (key) {
kfree(key->key);
+ key->key = NULL;
+ }
}
int ceph_crypto_key_clone(struct ceph_crypto_key *dst,
diff --git a/kernel/net/ceph/messenger.c b/kernel/net/ceph/messenger.c
index 967080a9f..63ae5dd24 100644
--- a/kernel/net/ceph/messenger.c
+++ b/kernel/net/ceph/messenger.c
@@ -6,6 +6,7 @@
#include <linux/inet.h>
#include <linux/kthread.h>
#include <linux/net.h>
+#include <linux/nsproxy.h>
#include <linux/slab.h>
#include <linux/socket.h>
#include <linux/string.h>
@@ -162,6 +163,7 @@ static struct kmem_cache *ceph_msg_data_cache;
static char tag_msg = CEPH_MSGR_TAG_MSG;
static char tag_ack = CEPH_MSGR_TAG_ACK;
static char tag_keepalive = CEPH_MSGR_TAG_KEEPALIVE;
+static char tag_keepalive2 = CEPH_MSGR_TAG_KEEPALIVE2;
#ifdef CONFIG_LOCKDEP
static struct lock_class_key socket_class;
@@ -175,7 +177,7 @@ static struct lock_class_key socket_class;
static void queue_con(struct ceph_connection *con);
static void cancel_con(struct ceph_connection *con);
-static void con_work(struct work_struct *);
+static void ceph_con_workfn(struct work_struct *);
static void con_fault(struct ceph_connection *con);
/*
@@ -275,23 +277,22 @@ static void _ceph_msgr_exit(void)
ceph_msgr_wq = NULL;
}
- ceph_msgr_slab_exit();
-
BUG_ON(zero_page == NULL);
- kunmap(zero_page);
page_cache_release(zero_page);
zero_page = NULL;
+
+ ceph_msgr_slab_exit();
}
int ceph_msgr_init(void)
{
+ if (ceph_msgr_slab_init())
+ return -ENOMEM;
+
BUG_ON(zero_page != NULL);
zero_page = ZERO_PAGE(0);
page_cache_get(zero_page);
- if (ceph_msgr_slab_init())
- return -ENOMEM;
-
/*
* The number of active work items is limited by the number of
* connections, so leave @max_active at default.
@@ -480,8 +481,8 @@ static int ceph_tcp_connect(struct ceph_connection *con)
int ret;
BUG_ON(con->sock);
- ret = sock_create_kern(con->peer_addr.in_addr.ss_family, SOCK_STREAM,
- IPPROTO_TCP, &sock);
+ ret = sock_create_kern(read_pnet(&con->msgr->net), paddr->ss_family,
+ SOCK_STREAM, IPPROTO_TCP, &sock);
if (ret)
return ret;
sock->sk->sk_allocation = GFP_NOFS;
@@ -508,7 +509,7 @@ static int ceph_tcp_connect(struct ceph_connection *con)
return ret;
}
- if (con->msgr->tcp_nodelay) {
+ if (ceph_test_opt(from_msgr(con->msgr), TCP_NODELAY)) {
int optval = 1;
ret = kernel_setsockopt(sock, SOL_TCP, TCP_NODELAY,
@@ -636,9 +637,6 @@ static int con_close_socket(struct ceph_connection *con)
static void ceph_msg_remove(struct ceph_msg *msg)
{
list_del_init(&msg->list_head);
- BUG_ON(msg->con == NULL);
- msg->con->ops->put(msg->con);
- msg->con = NULL;
ceph_msg_put(msg);
}
@@ -661,20 +659,21 @@ static void reset_connection(struct ceph_connection *con)
if (con->in_msg) {
BUG_ON(con->in_msg->con != con);
- con->in_msg->con = NULL;
ceph_msg_put(con->in_msg);
con->in_msg = NULL;
- con->ops->put(con);
}
con->connect_seq = 0;
con->out_seq = 0;
if (con->out_msg) {
+ BUG_ON(con->out_msg->con != con);
ceph_msg_put(con->out_msg);
con->out_msg = NULL;
}
con->in_seq = 0;
con->in_seq_acked = 0;
+
+ con->out_skip = 0;
}
/*
@@ -749,7 +748,7 @@ void ceph_con_init(struct ceph_connection *con, void *private,
mutex_init(&con->mutex);
INIT_LIST_HEAD(&con->out_queue);
INIT_LIST_HEAD(&con->out_sent);
- INIT_DELAYED_WORK(&con->work, con_work);
+ INIT_DELAYED_WORK(&con->work, ceph_con_workfn);
con->state = CON_STATE_CLOSED;
}
@@ -774,6 +773,8 @@ static u32 get_global_seq(struct ceph_messenger *msgr, u32 gt)
static void con_out_kvec_reset(struct ceph_connection *con)
{
+ BUG_ON(con->out_skip);
+
con->out_kvec_left = 0;
con->out_kvec_bytes = 0;
con->out_kvec_cur = &con->out_kvec[0];
@@ -782,9 +783,9 @@ static void con_out_kvec_reset(struct ceph_connection *con)
static void con_out_kvec_add(struct ceph_connection *con,
size_t size, void *data)
{
- int index;
+ int index = con->out_kvec_left;
- index = con->out_kvec_left;
+ BUG_ON(con->out_skip);
BUG_ON(index >= ARRAY_SIZE(con->out_kvec));
con->out_kvec[index].iov_len = size;
@@ -793,6 +794,27 @@ static void con_out_kvec_add(struct ceph_connection *con,
con->out_kvec_bytes += size;
}
+/*
+ * Chop off a kvec from the end. Return residual number of bytes for
+ * that kvec, i.e. how many bytes would have been written if the kvec
+ * hadn't been nuked.
+ */
+static int con_out_kvec_skip(struct ceph_connection *con)
+{
+ int off = con->out_kvec_cur - con->out_kvec;
+ int skip = 0;
+
+ if (con->out_kvec_bytes > 0) {
+ skip = con->out_kvec[off + con->out_kvec_left - 1].iov_len;
+ BUG_ON(con->out_kvec_bytes < skip);
+ BUG_ON(!con->out_kvec_left);
+ con->out_kvec_bytes -= skip;
+ con->out_kvec_left--;
+ }
+
+ return skip;
+}
+
#ifdef CONFIG_BLOCK
/*
@@ -1178,6 +1200,13 @@ static bool ceph_msg_data_advance(struct ceph_msg_data_cursor *cursor,
return new_piece;
}
+static size_t sizeof_footer(struct ceph_connection *con)
+{
+ return (con->peer_features & CEPH_FEATURE_MSG_AUTH) ?
+ sizeof(struct ceph_msg_footer) :
+ sizeof(struct ceph_msg_footer_old);
+}
+
static void prepare_message_data(struct ceph_msg *msg, u32 data_len)
{
BUG_ON(!msg);
@@ -1200,11 +1229,10 @@ static void prepare_write_message_footer(struct ceph_connection *con)
m->footer.flags |= CEPH_MSG_FOOTER_COMPLETE;
dout("prepare_write_message_footer %p\n", con);
- con->out_kvec_is_msg = true;
con->out_kvec[v].iov_base = &m->footer;
if (con->peer_features & CEPH_FEATURE_MSG_AUTH) {
if (con->ops->sign_message)
- con->ops->sign_message(con, m);
+ con->ops->sign_message(m);
else
m->footer.sig = 0;
con->out_kvec[v].iov_len = sizeof(m->footer);
@@ -1228,7 +1256,6 @@ static void prepare_write_message(struct ceph_connection *con)
u32 crc;
con_out_kvec_reset(con);
- con->out_kvec_is_msg = true;
con->out_msg_done = false;
/* Sneak an ack in there first? If we can get it into the same
@@ -1268,18 +1295,19 @@ static void prepare_write_message(struct ceph_connection *con)
/* tag + hdr + front + middle */
con_out_kvec_add(con, sizeof (tag_msg), &tag_msg);
- con_out_kvec_add(con, sizeof (m->hdr), &m->hdr);
+ con_out_kvec_add(con, sizeof(con->out_hdr), &con->out_hdr);
con_out_kvec_add(con, m->front.iov_len, m->front.iov_base);
if (m->middle)
con_out_kvec_add(con, m->middle->vec.iov_len,
m->middle->vec.iov_base);
- /* fill in crc (except data pages), footer */
+ /* fill in hdr crc and finalize hdr */
crc = crc32c(0, &m->hdr, offsetof(struct ceph_msg_header, crc));
con->out_msg->hdr.crc = cpu_to_le32(crc);
- con->out_msg->footer.flags = 0;
+ memcpy(&con->out_hdr, &con->out_msg->hdr, sizeof(con->out_hdr));
+ /* fill in front and middle crc, footer */
crc = crc32c(0, m->front.iov_base, m->front.iov_len);
con->out_msg->footer.front_crc = cpu_to_le32(crc);
if (m->middle) {
@@ -1291,6 +1319,7 @@ static void prepare_write_message(struct ceph_connection *con)
dout("%s front_crc %u middle_crc %u\n", __func__,
le32_to_cpu(con->out_msg->footer.front_crc),
le32_to_cpu(con->out_msg->footer.middle_crc));
+ con->out_msg->footer.flags = 0;
/* is there a data payload? */
con->out_msg->footer.data_crc = 0;
@@ -1351,7 +1380,16 @@ static void prepare_write_keepalive(struct ceph_connection *con)
{
dout("prepare_write_keepalive %p\n", con);
con_out_kvec_reset(con);
- con_out_kvec_add(con, sizeof (tag_keepalive), &tag_keepalive);
+ if (con->peer_features & CEPH_FEATURE_MSGR_KEEPALIVE2) {
+ struct timespec now = CURRENT_TIME;
+
+ con_out_kvec_add(con, sizeof(tag_keepalive2), &tag_keepalive2);
+ ceph_encode_timespec(&con->out_temp_keepalive2, &now);
+ con_out_kvec_add(con, sizeof(con->out_temp_keepalive2),
+ &con->out_temp_keepalive2);
+ } else {
+ con_out_kvec_add(con, sizeof(tag_keepalive), &tag_keepalive);
+ }
con_flag_set(con, CON_FLAG_WRITE_PENDING);
}
@@ -1422,7 +1460,8 @@ static int prepare_write_connect(struct ceph_connection *con)
dout("prepare_write_connect %p cseq=%d gseq=%d proto=%d\n", con,
con->connect_seq, global_seq, proto);
- con->out_connect.features = cpu_to_le64(con->msgr->supported_features);
+ con->out_connect.features =
+ cpu_to_le64(from_msgr(con->msgr)->supported_features);
con->out_connect.host_type = cpu_to_le32(CEPH_ENTITY_TYPE_CLIENT);
con->out_connect.connect_seq = cpu_to_le32(con->connect_seq);
con->out_connect.global_seq = cpu_to_le32(global_seq);
@@ -1485,7 +1524,6 @@ static int write_partial_kvec(struct ceph_connection *con)
}
}
con->out_kvec_left = 0;
- con->out_kvec_is_msg = false;
ret = 1;
out:
dout("write_partial_kvec %p %d left in %d kvecs ret = %d\n", con,
@@ -1517,7 +1555,7 @@ static int write_partial_message_data(struct ceph_connection *con)
{
struct ceph_msg *msg = con->out_msg;
struct ceph_msg_data_cursor *cursor = &msg->cursor;
- bool do_datacrc = !con->msgr->nocrc;
+ bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC);
u32 crc;
dout("%s %p msg %p\n", __func__, con, msg);
@@ -1542,10 +1580,10 @@ static int write_partial_message_data(struct ceph_connection *con)
bool need_crc;
int ret;
- page = ceph_msg_data_next(&msg->cursor, &page_offset, &length,
- &last_piece);
+ page = ceph_msg_data_next(cursor, &page_offset, &length,
+ &last_piece);
ret = ceph_tcp_sendpage(con->sock, page, page_offset,
- length, last_piece);
+ length, !last_piece);
if (ret <= 0) {
if (do_datacrc)
msg->footer.data_crc = cpu_to_le32(crc);
@@ -1554,7 +1592,7 @@ static int write_partial_message_data(struct ceph_connection *con)
}
if (do_datacrc && cursor->need_crc)
crc = ceph_crc32c_page(crc, page, page_offset, length);
- need_crc = ceph_msg_data_advance(&msg->cursor, (size_t)ret);
+ need_crc = ceph_msg_data_advance(cursor, (size_t)ret);
}
dout("%s %p msg %p done\n", __func__, con, msg);
@@ -1577,6 +1615,7 @@ static int write_partial_skip(struct ceph_connection *con)
{
int ret;
+ dout("%s %p %d left\n", __func__, con, con->out_skip);
while (con->out_skip > 0) {
size_t size = min(con->out_skip, (int) PAGE_CACHE_SIZE);
@@ -1625,6 +1664,12 @@ static void prepare_read_tag(struct ceph_connection *con)
con->in_tag = CEPH_MSGR_TAG_READY;
}
+static void prepare_read_keepalive_ack(struct ceph_connection *con)
+{
+ dout("prepare_read_keepalive_ack %p\n", con);
+ con->in_base_pos = 0;
+}
+
/*
* Prepare to read a message.
*/
@@ -1732,17 +1777,17 @@ static int verify_hello(struct ceph_connection *con)
static bool addr_is_blank(struct sockaddr_storage *ss)
{
+ struct in_addr *addr = &((struct sockaddr_in *)ss)->sin_addr;
+ struct in6_addr *addr6 = &((struct sockaddr_in6 *)ss)->sin6_addr;
+
switch (ss->ss_family) {
case AF_INET:
- return ((struct sockaddr_in *)ss)->sin_addr.s_addr == 0;
+ return addr->s_addr == htonl(INADDR_ANY);
case AF_INET6:
- return
- ((struct sockaddr_in6 *)ss)->sin6_addr.s6_addr32[0] == 0 &&
- ((struct sockaddr_in6 *)ss)->sin6_addr.s6_addr32[1] == 0 &&
- ((struct sockaddr_in6 *)ss)->sin6_addr.s6_addr32[2] == 0 &&
- ((struct sockaddr_in6 *)ss)->sin6_addr.s6_addr32[3] == 0;
+ return ipv6_addr_any(addr6);
+ default:
+ return true;
}
- return false;
}
static int addr_port(struct sockaddr_storage *ss)
@@ -1989,8 +2034,8 @@ static int process_banner(struct ceph_connection *con)
static int process_connect(struct ceph_connection *con)
{
- u64 sup_feat = con->msgr->supported_features;
- u64 req_feat = con->msgr->required_features;
+ u64 sup_feat = from_msgr(con->msgr)->supported_features;
+ u64 req_feat = from_msgr(con->msgr)->required_features;
u64 server_feat = ceph_sanitize_features(
le64_to_cpu(con->in_reply.features));
int ret;
@@ -2216,7 +2261,7 @@ static int read_partial_msg_data(struct ceph_connection *con)
{
struct ceph_msg *msg = con->in_msg;
struct ceph_msg_data_cursor *cursor = &msg->cursor;
- const bool do_datacrc = !con->msgr->nocrc;
+ bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC);
struct page *page;
size_t page_offset;
size_t length;
@@ -2230,8 +2275,7 @@ static int read_partial_msg_data(struct ceph_connection *con)
if (do_datacrc)
crc = con->in_data_crc;
while (cursor->resid) {
- page = ceph_msg_data_next(&msg->cursor, &page_offset, &length,
- NULL);
+ page = ceph_msg_data_next(cursor, &page_offset, &length, NULL);
ret = ceph_tcp_recvpage(con->sock, page, page_offset, length);
if (ret <= 0) {
if (do_datacrc)
@@ -2242,7 +2286,7 @@ static int read_partial_msg_data(struct ceph_connection *con)
if (do_datacrc)
crc = ceph_crc32c_page(crc, page, page_offset, ret);
- (void) ceph_msg_data_advance(&msg->cursor, (size_t)ret);
+ (void) ceph_msg_data_advance(cursor, (size_t)ret);
}
if (do_datacrc)
con->in_data_crc = crc;
@@ -2262,7 +2306,7 @@ static int read_partial_message(struct ceph_connection *con)
int end;
int ret;
unsigned int front_len, middle_len, data_len;
- bool do_datacrc = !con->msgr->nocrc;
+ bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC);
bool need_sign = (con->peer_features & CEPH_FEATURE_MSG_AUTH);
u64 seq;
u32 crc;
@@ -2301,9 +2345,9 @@ static int read_partial_message(struct ceph_connection *con)
ceph_pr_addr(&con->peer_addr.in_addr),
seq, con->in_seq + 1);
con->in_base_pos = -front_len - middle_len - data_len -
- sizeof(m->footer);
+ sizeof_footer(con);
con->in_tag = CEPH_MSGR_TAG_READY;
- return 0;
+ return 1;
} else if ((s64)seq - (s64)con->in_seq > 1) {
pr_err("read_partial_message bad seq %lld expected %lld\n",
seq, con->in_seq + 1);
@@ -2322,21 +2366,14 @@ static int read_partial_message(struct ceph_connection *con)
return ret;
BUG_ON(!con->in_msg ^ skip);
- if (con->in_msg && data_len > con->in_msg->data_length) {
- pr_warn("%s skipping long message (%u > %zd)\n",
- __func__, data_len, con->in_msg->data_length);
- ceph_msg_put(con->in_msg);
- con->in_msg = NULL;
- skip = 1;
- }
if (skip) {
/* skip this message */
dout("alloc_msg said skip message\n");
con->in_base_pos = -front_len - middle_len - data_len -
- sizeof(m->footer);
+ sizeof_footer(con);
con->in_tag = CEPH_MSGR_TAG_READY;
con->in_seq++;
- return 0;
+ return 1;
}
BUG_ON(!con->in_msg);
@@ -2414,7 +2451,7 @@ static int read_partial_message(struct ceph_connection *con)
}
if (need_sign && con->ops->check_message_signature &&
- con->ops->check_message_signature(con, m)) {
+ con->ops->check_message_signature(m)) {
pr_err("read_partial_message %p signature check failed\n", m);
return -EBADMSG;
}
@@ -2429,13 +2466,10 @@ static int read_partial_message(struct ceph_connection *con)
*/
static void process_message(struct ceph_connection *con)
{
- struct ceph_msg *msg;
+ struct ceph_msg *msg = con->in_msg;
BUG_ON(con->in_msg->con != con);
- con->in_msg->con = NULL;
- msg = con->in_msg;
con->in_msg = NULL;
- con->ops->put(con);
/* if first message, set peer_name */
if (con->peer_name.type == 0)
@@ -2457,6 +2491,17 @@ static void process_message(struct ceph_connection *con)
mutex_lock(&con->mutex);
}
+static int read_keepalive_ack(struct ceph_connection *con)
+{
+ struct ceph_timespec ceph_ts;
+ size_t size = sizeof(ceph_ts);
+ int ret = read_partial(con, size, size, &ceph_ts);
+ if (ret <= 0)
+ return ret;
+ ceph_decode_timespec(&con->last_keepalive_ack, &ceph_ts);
+ prepare_read_tag(con);
+ return 1;
+}
/*
* Write something to the socket. Called in a worker thread when the
@@ -2493,13 +2538,13 @@ more:
more_kvec:
/* kvec data queued? */
- if (con->out_skip) {
- ret = write_partial_skip(con);
+ if (con->out_kvec_left) {
+ ret = write_partial_kvec(con);
if (ret <= 0)
goto out;
}
- if (con->out_kvec_left) {
- ret = write_partial_kvec(con);
+ if (con->out_skip) {
+ ret = write_partial_skip(con);
if (ret <= 0)
goto out;
}
@@ -2526,6 +2571,10 @@ more_kvec:
do_next:
if (con->state == CON_STATE_OPEN) {
+ if (con_flag_test_and_clear(con, CON_FLAG_KEEPALIVE_PENDING)) {
+ prepare_write_keepalive(con);
+ goto more;
+ }
/* is anything else pending? */
if (!list_empty(&con->out_queue)) {
prepare_write_message(con);
@@ -2535,10 +2584,6 @@ do_next:
prepare_write_ack(con);
goto more;
}
- if (con_flag_test_and_clear(con, CON_FLAG_KEEPALIVE_PENDING)) {
- prepare_write_keepalive(con);
- goto more;
- }
}
/* Nothing to do! */
@@ -2641,6 +2686,9 @@ more:
case CEPH_MSGR_TAG_ACK:
prepare_read_ack(con);
break;
+ case CEPH_MSGR_TAG_KEEPALIVE2_ACK:
+ prepare_read_keepalive_ack(con);
+ break;
case CEPH_MSGR_TAG_CLOSE:
con_close_socket(con);
con->state = CON_STATE_CLOSED;
@@ -2654,7 +2702,7 @@ more:
if (ret <= 0) {
switch (ret) {
case -EBADMSG:
- con->error_msg = "bad crc";
+ con->error_msg = "bad crc/signature";
/* fall through */
case -EBADE:
ret = -EIO;
@@ -2684,6 +2732,12 @@ more:
process_ack(con);
goto more;
}
+ if (con->in_tag == CEPH_MSGR_TAG_KEEPALIVE2_ACK) {
+ ret = read_keepalive_ack(con);
+ if (ret <= 0)
+ goto out;
+ goto more;
+ }
out:
dout("try_read done on %p ret %d\n", con, ret);
@@ -2799,7 +2853,7 @@ static void con_fault_finish(struct ceph_connection *con)
/*
* Do some work on a connection. Drop a connection ref when we're done.
*/
-static void con_work(struct work_struct *work)
+static void ceph_con_workfn(struct work_struct *work)
{
struct ceph_connection *con = container_of(work, struct ceph_connection,
work.work);
@@ -2889,10 +2943,8 @@ static void con_fault(struct ceph_connection *con)
if (con->in_msg) {
BUG_ON(con->in_msg->con != con);
- con->in_msg->con = NULL;
ceph_msg_put(con->in_msg);
con->in_msg = NULL;
- con->ops->put(con);
}
/* Requeue anything that hasn't been acked */
@@ -2923,15 +2975,8 @@ static void con_fault(struct ceph_connection *con)
* initialize a new messenger instance
*/
void ceph_messenger_init(struct ceph_messenger *msgr,
- struct ceph_entity_addr *myaddr,
- u64 supported_features,
- u64 required_features,
- bool nocrc,
- bool tcp_nodelay)
+ struct ceph_entity_addr *myaddr)
{
- msgr->supported_features = supported_features;
- msgr->required_features = required_features;
-
spin_lock_init(&msgr->global_seq_lock);
if (myaddr)
@@ -2941,15 +2986,29 @@ void ceph_messenger_init(struct ceph_messenger *msgr,
msgr->inst.addr.type = 0;
get_random_bytes(&msgr->inst.addr.nonce, sizeof(msgr->inst.addr.nonce));
encode_my_addr(msgr);
- msgr->nocrc = nocrc;
- msgr->tcp_nodelay = tcp_nodelay;
atomic_set(&msgr->stopping, 0);
+ write_pnet(&msgr->net, get_net(current->nsproxy->net_ns));
dout("%s %p\n", __func__, msgr);
}
EXPORT_SYMBOL(ceph_messenger_init);
+void ceph_messenger_fini(struct ceph_messenger *msgr)
+{
+ put_net(read_pnet(&msgr->net));
+}
+EXPORT_SYMBOL(ceph_messenger_fini);
+
+static void msg_con_set(struct ceph_msg *msg, struct ceph_connection *con)
+{
+ if (msg->con)
+ msg->con->ops->put(msg->con);
+
+ msg->con = con ? con->ops->get(con) : NULL;
+ BUG_ON(msg->con != con);
+}
+
static void clear_standby(struct ceph_connection *con)
{
/* come back from STANDBY? */
@@ -2981,9 +3040,7 @@ void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg)
return;
}
- BUG_ON(msg->con != NULL);
- msg->con = con->ops->get(con);
- BUG_ON(msg->con == NULL);
+ msg_con_set(msg, con);
BUG_ON(!list_empty(&msg->list_head));
list_add_tail(&msg->list_head, &con->out_queue);
@@ -3011,31 +3068,45 @@ void ceph_msg_revoke(struct ceph_msg *msg)
{
struct ceph_connection *con = msg->con;
- if (!con)
+ if (!con) {
+ dout("%s msg %p null con\n", __func__, msg);
return; /* Message not in our possession */
+ }
mutex_lock(&con->mutex);
if (!list_empty(&msg->list_head)) {
dout("%s %p msg %p - was on queue\n", __func__, con, msg);
list_del_init(&msg->list_head);
- BUG_ON(msg->con == NULL);
- msg->con->ops->put(msg->con);
- msg->con = NULL;
msg->hdr.seq = 0;
ceph_msg_put(msg);
}
if (con->out_msg == msg) {
- dout("%s %p msg %p - was sending\n", __func__, con, msg);
- con->out_msg = NULL;
- if (con->out_kvec_is_msg) {
- con->out_skip = con->out_kvec_bytes;
- con->out_kvec_is_msg = false;
+ BUG_ON(con->out_skip);
+ /* footer */
+ if (con->out_msg_done) {
+ con->out_skip += con_out_kvec_skip(con);
+ } else {
+ BUG_ON(!msg->data_length);
+ if (con->peer_features & CEPH_FEATURE_MSG_AUTH)
+ con->out_skip += sizeof(msg->footer);
+ else
+ con->out_skip += sizeof(msg->old_footer);
}
+ /* data, middle, front */
+ if (msg->data_length)
+ con->out_skip += msg->cursor.total_resid;
+ if (msg->middle)
+ con->out_skip += con_out_kvec_skip(con);
+ con->out_skip += con_out_kvec_skip(con);
+
+ dout("%s %p msg %p - was sending, will write %d skip %d\n",
+ __func__, con, msg, con->out_kvec_bytes, con->out_skip);
msg->hdr.seq = 0;
-
+ con->out_msg = NULL;
ceph_msg_put(msg);
}
+
mutex_unlock(&con->mutex);
}
@@ -3044,16 +3115,13 @@ void ceph_msg_revoke(struct ceph_msg *msg)
*/
void ceph_msg_revoke_incoming(struct ceph_msg *msg)
{
- struct ceph_connection *con;
+ struct ceph_connection *con = msg->con;
- BUG_ON(msg == NULL);
- if (!msg->con) {
+ if (!con) {
dout("%s msg %p null con\n", __func__, msg);
-
return; /* Message not in our possession */
}
- con = msg->con;
mutex_lock(&con->mutex);
if (con->in_msg == msg) {
unsigned int front_len = le32_to_cpu(con->in_hdr.front_len);
@@ -3094,6 +3162,20 @@ void ceph_con_keepalive(struct ceph_connection *con)
}
EXPORT_SYMBOL(ceph_con_keepalive);
+bool ceph_con_keepalive_expired(struct ceph_connection *con,
+ unsigned long interval)
+{
+ if (interval > 0 &&
+ (con->peer_features & CEPH_FEATURE_MSGR_KEEPALIVE2)) {
+ struct timespec now = CURRENT_TIME;
+ struct timespec ts;
+ jiffies_to_timespec(interval, &ts);
+ ts = timespec_add(con->last_keepalive_ack, ts);
+ return timespec_compare(&now, &ts) >= 0;
+ }
+ return false;
+}
+
static struct ceph_msg_data *ceph_msg_data_create(enum ceph_msg_data_type type)
{
struct ceph_msg_data *data;
@@ -3285,9 +3367,8 @@ static int ceph_con_in_msg_alloc(struct ceph_connection *con, int *skip)
}
if (msg) {
BUG_ON(*skip);
+ msg_con_set(msg, con);
con->in_msg = msg;
- con->in_msg->con = con->ops->get(con);
- BUG_ON(con->in_msg->con == NULL);
} else {
/*
* Null message pointer means either we should skip
@@ -3334,6 +3415,8 @@ static void ceph_msg_release(struct kref *kref)
dout("%s %p\n", __func__, m);
WARN_ON(!list_empty(&m->list_head));
+ msg_con_set(m, NULL);
+
/* drop middle, data, if any */
if (m->middle) {
ceph_buffer_put(m->middle);
diff --git a/kernel/net/ceph/mon_client.c b/kernel/net/ceph/mon_client.c
index 2b3cf05e8..edda01626 100644
--- a/kernel/net/ceph/mon_client.c
+++ b/kernel/net/ceph/mon_client.c
@@ -149,6 +149,10 @@ static int __open_session(struct ceph_mon_client *monc)
CEPH_ENTITY_TYPE_MON, monc->cur_mon,
&monc->monmap->mon_inst[monc->cur_mon].addr);
+ /* send an initial keepalive to ensure our timestamp is
+ * valid by the time we are in an OPENED state */
+ ceph_con_keepalive(&monc->con);
+
/* initiatiate authentication handshake */
ret = ceph_auth_build_hello(monc->auth,
monc->m_auth->front.iov_base,
@@ -170,14 +174,19 @@ static bool __sub_expired(struct ceph_mon_client *monc)
*/
static void __schedule_delayed(struct ceph_mon_client *monc)
{
- unsigned int delay;
+ struct ceph_options *opt = monc->client->options;
+ unsigned long delay;
- if (monc->cur_mon < 0 || __sub_expired(monc))
+ if (monc->cur_mon < 0 || __sub_expired(monc)) {
delay = 10 * HZ;
- else
+ } else {
delay = 20 * HZ;
- dout("__schedule_delayed after %u\n", delay);
- schedule_delayed_work(&monc->delayed_work, delay);
+ if (opt->monc_ping_timeout > 0)
+ delay = min(delay, opt->monc_ping_timeout / 3);
+ }
+ dout("__schedule_delayed after %lu\n", delay);
+ schedule_delayed_work(&monc->delayed_work,
+ round_jiffies_relative(delay));
}
/*
@@ -298,21 +307,28 @@ void ceph_monc_request_next_osdmap(struct ceph_mon_client *monc)
}
EXPORT_SYMBOL(ceph_monc_request_next_osdmap);
+/*
+ * Wait for an osdmap with a given epoch.
+ *
+ * @epoch: epoch to wait for
+ * @timeout: in jiffies, 0 means "wait forever"
+ */
int ceph_monc_wait_osdmap(struct ceph_mon_client *monc, u32 epoch,
unsigned long timeout)
{
unsigned long started = jiffies;
- int ret;
+ long ret;
mutex_lock(&monc->mutex);
while (monc->have_osdmap < epoch) {
mutex_unlock(&monc->mutex);
- if (timeout != 0 && time_after_eq(jiffies, started + timeout))
+ if (timeout && time_after_eq(jiffies, started + timeout))
return -ETIMEDOUT;
ret = wait_event_interruptible_timeout(monc->client->auth_wq,
- monc->have_osdmap >= epoch, timeout);
+ monc->have_osdmap >= epoch,
+ ceph_timeout_jiffies(timeout));
if (ret < 0)
return ret;
@@ -736,11 +752,23 @@ static void delayed_work(struct work_struct *work)
__close_session(monc);
__open_session(monc); /* continue hunting */
} else {
- ceph_con_keepalive(&monc->con);
+ struct ceph_options *opt = monc->client->options;
+ int is_auth = ceph_auth_is_authenticated(monc->auth);
+ if (ceph_con_keepalive_expired(&monc->con,
+ opt->monc_ping_timeout)) {
+ dout("monc keepalive timeout\n");
+ is_auth = 0;
+ __close_session(monc);
+ monc->hunting = true;
+ __open_session(monc);
+ }
- __validate_auth(monc);
+ if (!monc->hunting) {
+ ceph_con_keepalive(&monc->con);
+ __validate_auth(monc);
+ }
- if (ceph_auth_is_authenticated(monc->auth))
+ if (is_auth)
__send_subscribe(monc);
}
__schedule_delayed(monc);
diff --git a/kernel/net/ceph/osd_client.c b/kernel/net/ceph/osd_client.c
index c4ec92392..a28e47ff1 100644
--- a/kernel/net/ceph/osd_client.c
+++ b/kernel/net/ceph/osd_client.c
@@ -120,11 +120,13 @@ static void ceph_osd_data_bio_init(struct ceph_osd_data *osd_data,
}
#endif /* CONFIG_BLOCK */
-#define osd_req_op_data(oreq, whch, typ, fld) \
- ({ \
- BUG_ON(whch >= (oreq)->r_num_ops); \
- &(oreq)->r_ops[whch].typ.fld; \
- })
+#define osd_req_op_data(oreq, whch, typ, fld) \
+({ \
+ struct ceph_osd_request *__oreq = (oreq); \
+ unsigned int __whch = (whch); \
+ BUG_ON(__whch >= __oreq->r_num_ops); \
+ &__oreq->r_ops[__whch].typ.fld; \
+})
static struct ceph_osd_data *
osd_req_op_raw_data_in(struct ceph_osd_request *osd_req, unsigned int which)
@@ -285,6 +287,7 @@ static void osd_req_op_data_release(struct ceph_osd_request *osd_req,
switch (op->op) {
case CEPH_OSD_OP_READ:
case CEPH_OSD_OP_WRITE:
+ case CEPH_OSD_OP_WRITEFULL:
ceph_osd_data_release(&op->extent.osd_data);
break;
case CEPH_OSD_OP_CALL:
@@ -296,6 +299,9 @@ static void osd_req_op_data_release(struct ceph_osd_request *osd_req,
case CEPH_OSD_OP_CMPXATTR:
ceph_osd_data_release(&op->xattr.osd_data);
break;
+ case CEPH_OSD_OP_STAT:
+ ceph_osd_data_release(&op->raw_data_in);
+ break;
default:
break;
}
@@ -450,7 +456,7 @@ __CEPH_FORALL_OSD_OPS(GENERATE_CASE)
*/
static struct ceph_osd_req_op *
_osd_req_op_init(struct ceph_osd_request *osd_req, unsigned int which,
- u16 opcode)
+ u16 opcode, u32 flags)
{
struct ceph_osd_req_op *op;
@@ -460,14 +466,15 @@ _osd_req_op_init(struct ceph_osd_request *osd_req, unsigned int which,
op = &osd_req->r_ops[which];
memset(op, 0, sizeof (*op));
op->op = opcode;
+ op->flags = flags;
return op;
}
void osd_req_op_init(struct ceph_osd_request *osd_req,
- unsigned int which, u16 opcode)
+ unsigned int which, u16 opcode, u32 flags)
{
- (void)_osd_req_op_init(osd_req, which, opcode);
+ (void)_osd_req_op_init(osd_req, which, opcode, flags);
}
EXPORT_SYMBOL(osd_req_op_init);
@@ -476,17 +483,19 @@ void osd_req_op_extent_init(struct ceph_osd_request *osd_req,
u64 offset, u64 length,
u64 truncate_size, u32 truncate_seq)
{
- struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, opcode);
+ struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which,
+ opcode, 0);
size_t payload_len = 0;
BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE &&
- opcode != CEPH_OSD_OP_ZERO && opcode != CEPH_OSD_OP_TRUNCATE);
+ opcode != CEPH_OSD_OP_WRITEFULL && opcode != CEPH_OSD_OP_ZERO &&
+ opcode != CEPH_OSD_OP_TRUNCATE);
op->extent.offset = offset;
op->extent.length = length;
op->extent.truncate_size = truncate_size;
op->extent.truncate_seq = truncate_seq;
- if (opcode == CEPH_OSD_OP_WRITE)
+ if (opcode == CEPH_OSD_OP_WRITE || opcode == CEPH_OSD_OP_WRITEFULL)
payload_len += length;
op->payload_len = payload_len;
@@ -515,7 +524,8 @@ EXPORT_SYMBOL(osd_req_op_extent_update);
void osd_req_op_cls_init(struct ceph_osd_request *osd_req, unsigned int which,
u16 opcode, const char *class, const char *method)
{
- struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, opcode);
+ struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which,
+ opcode, 0);
struct ceph_pagelist *pagelist;
size_t payload_len = 0;
size_t size;
@@ -552,7 +562,8 @@ int osd_req_op_xattr_init(struct ceph_osd_request *osd_req, unsigned int which,
u16 opcode, const char *name, const void *value,
size_t size, u8 cmp_op, u8 cmp_mode)
{
- struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, opcode);
+ struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which,
+ opcode, 0);
struct ceph_pagelist *pagelist;
size_t payload_len;
@@ -585,7 +596,8 @@ void osd_req_op_watch_init(struct ceph_osd_request *osd_req,
unsigned int which, u16 opcode,
u64 cookie, u64 version, int flag)
{
- struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, opcode);
+ struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which,
+ opcode, 0);
BUG_ON(opcode != CEPH_OSD_OP_NOTIFY_ACK && opcode != CEPH_OSD_OP_WATCH);
@@ -602,7 +614,8 @@ void osd_req_op_alloc_hint_init(struct ceph_osd_request *osd_req,
u64 expected_write_size)
{
struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which,
- CEPH_OSD_OP_SETALLOCHINT);
+ CEPH_OSD_OP_SETALLOCHINT,
+ 0);
op->alloc_hint.expected_object_size = expected_object_size;
op->alloc_hint.expected_write_size = expected_write_size;
@@ -661,9 +674,11 @@ static u64 osd_req_encode_op(struct ceph_osd_request *req,
break;
case CEPH_OSD_OP_READ:
case CEPH_OSD_OP_WRITE:
+ case CEPH_OSD_OP_WRITEFULL:
case CEPH_OSD_OP_ZERO:
case CEPH_OSD_OP_TRUNCATE:
- if (src->op == CEPH_OSD_OP_WRITE)
+ if (src->op == CEPH_OSD_OP_WRITE ||
+ src->op == CEPH_OSD_OP_WRITEFULL)
request_data_len = src->extent.length;
dst->extent.offset = cpu_to_le64(src->extent.offset);
dst->extent.length = cpu_to_le64(src->extent.length);
@@ -672,7 +687,8 @@ static u64 osd_req_encode_op(struct ceph_osd_request *req,
dst->extent.truncate_seq =
cpu_to_le32(src->extent.truncate_seq);
osd_data = &src->extent.osd_data;
- if (src->op == CEPH_OSD_OP_WRITE)
+ if (src->op == CEPH_OSD_OP_WRITE ||
+ src->op == CEPH_OSD_OP_WRITEFULL)
ceph_osdc_msg_data_add(req->r_request, osd_data);
else
ceph_osdc_msg_data_add(req->r_reply, osd_data);
@@ -786,7 +802,7 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
}
if (opcode == CEPH_OSD_OP_CREATE || opcode == CEPH_OSD_OP_DELETE) {
- osd_req_op_init(req, which, opcode);
+ osd_req_op_init(req, which, opcode, 0);
} else {
u32 object_size = le32_to_cpu(layout->fl_object_size);
u32 object_base = off - objoff;
@@ -1088,7 +1104,7 @@ static void __move_osd_to_lru(struct ceph_osd_client *osdc,
BUG_ON(!list_empty(&osd->o_osd_lru));
list_add_tail(&osd->o_osd_lru, &osdc->osd_lru);
- osd->lru_ttl = jiffies + osdc->client->options->osd_idle_ttl * HZ;
+ osd->lru_ttl = jiffies + osdc->client->options->osd_idle_ttl;
}
static void maybe_move_osd_to_lru(struct ceph_osd_client *osdc,
@@ -1199,7 +1215,7 @@ static struct ceph_osd *__lookup_osd(struct ceph_osd_client *osdc, int o)
static void __schedule_osd_timeout(struct ceph_osd_client *osdc)
{
schedule_delayed_work(&osdc->timeout_work,
- osdc->client->options->osd_keepalive_timeout * HZ);
+ osdc->client->options->osd_keepalive_timeout);
}
static void __cancel_osd_timeout(struct ceph_osd_client *osdc)
@@ -1567,10 +1583,9 @@ static void handle_timeout(struct work_struct *work)
{
struct ceph_osd_client *osdc =
container_of(work, struct ceph_osd_client, timeout_work.work);
+ struct ceph_options *opts = osdc->client->options;
struct ceph_osd_request *req;
struct ceph_osd *osd;
- unsigned long keepalive =
- osdc->client->options->osd_keepalive_timeout * HZ;
struct list_head slow_osds;
dout("timeout\n");
down_read(&osdc->map_sem);
@@ -1586,7 +1601,8 @@ static void handle_timeout(struct work_struct *work)
*/
INIT_LIST_HEAD(&slow_osds);
list_for_each_entry(req, &osdc->req_lru, r_req_lru_item) {
- if (time_before(jiffies, req->r_stamp + keepalive))
+ if (time_before(jiffies,
+ req->r_stamp + opts->osd_keepalive_timeout))
break;
osd = req->r_osd;
@@ -1613,8 +1629,7 @@ static void handle_osds_timeout(struct work_struct *work)
struct ceph_osd_client *osdc =
container_of(work, struct ceph_osd_client,
osds_timeout_work.work);
- unsigned long delay =
- osdc->client->options->osd_idle_ttl * HZ >> 2;
+ unsigned long delay = osdc->client->options->osd_idle_ttl / 4;
dout("osds timeout\n");
down_read(&osdc->map_sem);
@@ -1737,8 +1752,7 @@ static void complete_request(struct ceph_osd_request *req)
* handle osd op reply. either call the callback if it is specified,
* or do the completion to wake up the waiting thread.
*/
-static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
- struct ceph_connection *con)
+static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg)
{
void *p, *end;
struct ceph_osd_request *req;
@@ -2619,7 +2633,7 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
osdc->event_count = 0;
schedule_delayed_work(&osdc->osds_timeout_work,
- round_jiffies_relative(osdc->client->options->osd_idle_ttl * HZ));
+ round_jiffies_relative(osdc->client->options->osd_idle_ttl));
err = -ENOMEM;
osdc->req_mempool = mempool_create_kmalloc_pool(10,
@@ -2794,7 +2808,7 @@ static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
ceph_osdc_handle_map(osdc, msg);
break;
case CEPH_MSG_OSD_OPREPLY:
- handle_reply(osdc, msg, con);
+ handle_reply(osdc, msg);
break;
case CEPH_MSG_WATCH_NOTIFY:
handle_watch_notify(osdc, msg);
@@ -2809,8 +2823,9 @@ out:
}
/*
- * lookup and return message for incoming reply. set up reply message
- * pages.
+ * Lookup and return message for incoming reply. Don't try to do
+ * anything about a larger than preallocated data portion of the
+ * message at the moment - for now, just skip the message.
*/
static struct ceph_msg *get_reply(struct ceph_connection *con,
struct ceph_msg_header *hdr,
@@ -2828,23 +2843,19 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
mutex_lock(&osdc->request_mutex);
req = __lookup_request(osdc, tid);
if (!req) {
- *skip = 1;
+ dout("%s osd%d tid %llu unknown, skipping\n", __func__,
+ osd->o_osd, tid);
m = NULL;
- dout("get_reply unknown tid %llu from osd%d\n", tid,
- osd->o_osd);
+ *skip = 1;
goto out;
}
- if (req->r_reply->con)
- dout("%s revoking msg %p from old con %p\n", __func__,
- req->r_reply, req->r_reply->con);
ceph_msg_revoke_incoming(req->r_reply);
if (front_len > req->r_reply->front_alloc_len) {
- pr_warn("get_reply front %d > preallocated %d (%u#%llu)\n",
- front_len, req->r_reply->front_alloc_len,
- (unsigned int)con->peer_name.type,
- le64_to_cpu(con->peer_name.num));
+ pr_warn("%s osd%d tid %llu front %d > preallocated %d\n",
+ __func__, osd->o_osd, req->r_tid, front_len,
+ req->r_reply->front_alloc_len);
m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front_len, GFP_NOFS,
false);
if (!m)
@@ -2852,37 +2863,22 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
ceph_msg_put(req->r_reply);
req->r_reply = m;
}
- m = ceph_msg_get(req->r_reply);
-
- if (data_len > 0) {
- struct ceph_osd_data *osd_data;
- /*
- * XXX This is assuming there is only one op containing
- * XXX page data. Probably OK for reads, but this
- * XXX ought to be done more generally.
- */
- osd_data = osd_req_op_extent_osd_data(req, 0);
- if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES) {
- if (osd_data->pages &&
- unlikely(osd_data->length < data_len)) {
-
- pr_warn("tid %lld reply has %d bytes we had only %llu bytes ready\n",
- tid, data_len, osd_data->length);
- *skip = 1;
- ceph_msg_put(m);
- m = NULL;
- goto out;
- }
- }
+ if (data_len > req->r_reply->data_length) {
+ pr_warn("%s osd%d tid %llu data %d > preallocated %zu, skipping\n",
+ __func__, osd->o_osd, req->r_tid, data_len,
+ req->r_reply->data_length);
+ m = NULL;
+ *skip = 1;
+ goto out;
}
- *skip = 0;
+
+ m = ceph_msg_get(req->r_reply);
dout("get_reply tid %lld %p\n", tid, m);
out:
mutex_unlock(&osdc->request_mutex);
return m;
-
}
static struct ceph_msg *alloc_msg(struct ceph_connection *con,
@@ -2980,17 +2976,19 @@ static int invalidate_authorizer(struct ceph_connection *con)
return ceph_monc_validate_auth(&osdc->client->monc);
}
-static int sign_message(struct ceph_connection *con, struct ceph_msg *msg)
+static int osd_sign_message(struct ceph_msg *msg)
{
- struct ceph_osd *o = con->private;
+ struct ceph_osd *o = msg->con->private;
struct ceph_auth_handshake *auth = &o->o_auth;
+
return ceph_auth_sign_message(auth, msg);
}
-static int check_message_signature(struct ceph_connection *con, struct ceph_msg *msg)
+static int osd_check_message_signature(struct ceph_msg *msg)
{
- struct ceph_osd *o = con->private;
+ struct ceph_osd *o = msg->con->private;
struct ceph_auth_handshake *auth = &o->o_auth;
+
return ceph_auth_check_message_signature(auth, msg);
}
@@ -3002,7 +3000,7 @@ static const struct ceph_connection_operations osd_con_ops = {
.verify_authorizer_reply = verify_authorizer_reply,
.invalidate_authorizer = invalidate_authorizer,
.alloc_msg = alloc_msg,
- .sign_message = sign_message,
- .check_message_signature = check_message_signature,
+ .sign_message = osd_sign_message,
+ .check_message_signature = osd_check_message_signature,
.fault = osd_reset,
};
diff --git a/kernel/net/ceph/osdmap.c b/kernel/net/ceph/osdmap.c
index 4a3125836..7d8f581d9 100644
--- a/kernel/net/ceph/osdmap.c
+++ b/kernel/net/ceph/osdmap.c
@@ -1300,7 +1300,7 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end,
ceph_decode_addr(&addr);
pr_info("osd%d up\n", osd);
BUG_ON(osd >= map->max_osd);
- map->osd_state[osd] |= CEPH_OSD_UP;
+ map->osd_state[osd] |= CEPH_OSD_UP | CEPH_OSD_EXISTS;
map->osd_addr[osd] = addr;
}
diff --git a/kernel/net/ceph/pagevec.c b/kernel/net/ceph/pagevec.c
index 096d91447..d4f5f220a 100644
--- a/kernel/net/ceph/pagevec.c
+++ b/kernel/net/ceph/pagevec.c
@@ -51,10 +51,7 @@ void ceph_put_page_vector(struct page **pages, int num_pages, bool dirty)
set_page_dirty_lock(pages[i]);
put_page(pages[i]);
}
- if (is_vmalloc_addr(pages))
- vfree(pages);
- else
- kfree(pages);
+ kvfree(pages);
}
EXPORT_SYMBOL(ceph_put_page_vector);