summaryrefslogtreecommitdiffstats
path: root/kernel/net/sunrpc/xprtrdma/rpc_rdma.c
diff options
context:
space:
mode:
Diffstat (limited to 'kernel/net/sunrpc/xprtrdma/rpc_rdma.c')
-rw-r--r--kernel/net/sunrpc/xprtrdma/rpc_rdma.c353
1 files changed, 206 insertions, 147 deletions
diff --git a/kernel/net/sunrpc/xprtrdma/rpc_rdma.c b/kernel/net/sunrpc/xprtrdma/rpc_rdma.c
index 2c53ea9e1..c10d96994 100644
--- a/kernel/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/kernel/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -71,6 +71,67 @@ static const char transfertypes[][12] = {
};
#endif
+/* The client can send a request inline as long as the RPCRDMA header
+ * plus the RPC call fit under the transport's inline limit. If the
+ * combined call message size exceeds that limit, the client must use
+ * the read chunk list for this operation.
+ */
+static bool rpcrdma_args_inline(struct rpc_rqst *rqst)
+{
+ unsigned int callsize = RPCRDMA_HDRLEN_MIN + rqst->rq_snd_buf.len;
+
+ return callsize <= RPCRDMA_INLINE_WRITE_THRESHOLD(rqst);
+}
+
+/* The client can't know how large the actual reply will be. Thus it
+ * plans for the largest possible reply for that particular ULP
+ * operation. If the maximum combined reply message size exceeds that
+ * limit, the client must provide a write list or a reply chunk for
+ * this request.
+ */
+static bool rpcrdma_results_inline(struct rpc_rqst *rqst)
+{
+ unsigned int repsize = RPCRDMA_HDRLEN_MIN + rqst->rq_rcv_buf.buflen;
+
+ return repsize <= RPCRDMA_INLINE_READ_THRESHOLD(rqst);
+}
+
+static int
+rpcrdma_tail_pullup(struct xdr_buf *buf)
+{
+ size_t tlen = buf->tail[0].iov_len;
+ size_t skip = tlen & 3;
+
+ /* Do not include the tail if it is only an XDR pad */
+ if (tlen < 4)
+ return 0;
+
+ /* xdr_write_pages() adds a pad at the beginning of the tail
+ * if the content in "buf->pages" is unaligned. Force the
+ * tail's actual content to land at the next XDR position
+ * after the head instead.
+ */
+ if (skip) {
+ unsigned char *src, *dst;
+ unsigned int count;
+
+ src = buf->tail[0].iov_base;
+ dst = buf->head[0].iov_base;
+ dst += buf->head[0].iov_len;
+
+ src += skip;
+ tlen -= skip;
+
+ dprintk("RPC: %s: skip=%zu, memmove(%p, %p, %zu)\n",
+ __func__, skip, dst, src, tlen);
+
+ for (count = tlen; count; count--)
+ *dst++ = *src++;
+ }
+
+ return tlen;
+}
+
/*
* Chunk assembly from upper layer xdr_buf.
*
@@ -122,6 +183,10 @@ rpcrdma_convert_iovs(struct xdr_buf *xdrbuf, unsigned int pos,
if (len && n == nsegs)
return -EIO;
+ /* When encoding the read list, the tail is always sent inline */
+ if (type == rpcrdma_readch)
+ return n;
+
if (xdrbuf->tail[0].iov_len) {
/* the rpcrdma protocol allows us to omit any trailing
* xdr pad bytes, saving the server an RDMA operation. */
@@ -284,9 +349,6 @@ rpcrdma_create_chunks(struct rpc_rqst *rqst, struct xdr_buf *target,
return (unsigned char *)iptr - (unsigned char *)headerp;
out:
- if (r_xprt->rx_ia.ri_memreg_strategy == RPCRDMA_FRMR)
- return n;
-
for (pos = 0; nchunks--;)
pos += r_xprt->rx_ia.ri_ops->ro_unmap(r_xprt,
&req->rl_segments[pos]);
@@ -300,8 +362,7 @@ out:
* pre-registered memory buffer for this request. For small amounts
* of data, this is efficient. The cutoff value is tunable.
*/
-static int
-rpcrdma_inline_pullup(struct rpc_rqst *rqst, int pad)
+static void rpcrdma_inline_pullup(struct rpc_rqst *rqst)
{
int i, npages, curlen;
int copy_len;
@@ -313,16 +374,9 @@ rpcrdma_inline_pullup(struct rpc_rqst *rqst, int pad)
destp = rqst->rq_svec[0].iov_base;
curlen = rqst->rq_svec[0].iov_len;
destp += curlen;
- /*
- * Do optional padding where it makes sense. Alignment of write
- * payload can help the server, if our setting is accurate.
- */
- pad -= (curlen + 36/*sizeof(struct rpcrdma_msg_padded)*/);
- if (pad < 0 || rqst->rq_slen - curlen < RPCRDMA_INLINE_PAD_THRESH)
- pad = 0; /* don't pad this request */
- dprintk("RPC: %s: pad %d destp 0x%p len %d hdrlen %d\n",
- __func__, pad, destp, rqst->rq_slen, curlen);
+ dprintk("RPC: %s: destp 0x%p len %d hdrlen %d\n",
+ __func__, destp, rqst->rq_slen, curlen);
copy_len = rqst->rq_snd_buf.page_len;
@@ -358,7 +412,6 @@ rpcrdma_inline_pullup(struct rpc_rqst *rqst, int pad)
page_base = 0;
}
/* header now contains entire send message */
- return pad;
}
/*
@@ -383,11 +436,16 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
char *base;
- size_t rpclen, padlen;
+ size_t rpclen;
ssize_t hdrlen;
enum rpcrdma_chunktype rtype, wtype;
struct rpcrdma_msg *headerp;
+#if defined(CONFIG_SUNRPC_BACKCHANNEL)
+ if (test_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state))
+ return rpcrdma_bc_marshal_reply(rqst);
+#endif
+
/*
* rpclen gets amount of data in first buffer, which is the
* pre-registered buffer.
@@ -405,28 +463,15 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
/*
* Chunks needed for results?
*
+ * o Read ops return data as write chunk(s), header as inline.
* o If the expected result is under the inline threshold, all ops
- * return as inline (but see later).
+ * return as inline.
* o Large non-read ops return as a single reply chunk.
- * o Large read ops return data as write chunk(s), header as inline.
- *
- * Note: the NFS code sending down multiple result segments implies
- * the op is one of read, readdir[plus], readlink or NFSv4 getacl.
- */
-
- /*
- * This code can handle read chunks, write chunks OR reply
- * chunks -- only one type. If the request is too big to fit
- * inline, then we will choose read chunks. If the request is
- * a READ, then use write chunks to separate the file data
- * into pages; otherwise use reply chunks.
*/
- if (rqst->rq_rcv_buf.buflen <= RPCRDMA_INLINE_READ_THRESHOLD(rqst))
- wtype = rpcrdma_noch;
- else if (rqst->rq_rcv_buf.page_len == 0)
- wtype = rpcrdma_replych;
- else if (rqst->rq_rcv_buf.flags & XDRBUF_READ)
+ if (rqst->rq_rcv_buf.flags & XDRBUF_READ)
wtype = rpcrdma_writech;
+ else if (rpcrdma_results_inline(rqst))
+ wtype = rpcrdma_noch;
else
wtype = rpcrdma_replych;
@@ -435,21 +480,25 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
*
* o If the total request is under the inline threshold, all ops
* are sent as inline.
- * o Large non-write ops are sent with the entire message as a
- * single read chunk (protocol 0-position special case).
* o Large write ops transmit data as read chunk(s), header as
* inline.
+ * o Large non-write ops are sent with the entire message as a
+ * single read chunk (protocol 0-position special case).
*
- * Note: the NFS code sending down multiple argument segments
- * implies the op is a write.
- * TBD check NFSv4 setacl
+ * This assumes that the upper layer does not present a request
+ * that both has a data payload, and whose non-data arguments
+ * by themselves are larger than the inline threshold.
*/
- if (rqst->rq_snd_buf.len <= RPCRDMA_INLINE_WRITE_THRESHOLD(rqst))
+ if (rpcrdma_args_inline(rqst)) {
rtype = rpcrdma_noch;
- else if (rqst->rq_snd_buf.page_len == 0)
- rtype = rpcrdma_areadch;
- else
+ } else if (rqst->rq_snd_buf.flags & XDRBUF_WRITE) {
rtype = rpcrdma_readch;
+ } else {
+ r_xprt->rx_stats.nomsg_call_count++;
+ headerp->rm_type = htonl(RDMA_NOMSG);
+ rtype = rpcrdma_areadch;
+ rpclen = 0;
+ }
/* The following simplification is not true forever */
if (rtype != rpcrdma_noch && wtype == rpcrdma_replych)
@@ -461,7 +510,6 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
}
hdrlen = RPCRDMA_HDRLEN_MIN;
- padlen = 0;
/*
* Pull up any extra send data into the preregistered buffer.
@@ -470,45 +518,15 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
*/
if (rtype == rpcrdma_noch) {
- padlen = rpcrdma_inline_pullup(rqst,
- RPCRDMA_INLINE_PAD_VALUE(rqst));
-
- if (padlen) {
- headerp->rm_type = rdma_msgp;
- headerp->rm_body.rm_padded.rm_align =
- cpu_to_be32(RPCRDMA_INLINE_PAD_VALUE(rqst));
- headerp->rm_body.rm_padded.rm_thresh =
- cpu_to_be32(RPCRDMA_INLINE_PAD_THRESH);
- headerp->rm_body.rm_padded.rm_pempty[0] = xdr_zero;
- headerp->rm_body.rm_padded.rm_pempty[1] = xdr_zero;
- headerp->rm_body.rm_padded.rm_pempty[2] = xdr_zero;
- hdrlen += 2 * sizeof(u32); /* extra words in padhdr */
- if (wtype != rpcrdma_noch) {
- dprintk("RPC: %s: invalid chunk list\n",
- __func__);
- return -EIO;
- }
- } else {
- headerp->rm_body.rm_nochunks.rm_empty[0] = xdr_zero;
- headerp->rm_body.rm_nochunks.rm_empty[1] = xdr_zero;
- headerp->rm_body.rm_nochunks.rm_empty[2] = xdr_zero;
- /* new length after pullup */
- rpclen = rqst->rq_svec[0].iov_len;
- /*
- * Currently we try to not actually use read inline.
- * Reply chunks have the desirable property that
- * they land, packed, directly in the target buffers
- * without headers, so they require no fixup. The
- * additional RDMA Write op sends the same amount
- * of data, streams on-the-wire and adds no overhead
- * on receive. Therefore, we request a reply chunk
- * for non-writes wherever feasible and efficient.
- */
- if (wtype == rpcrdma_noch)
- wtype = rpcrdma_replych;
- }
- }
+ rpcrdma_inline_pullup(rqst);
+ headerp->rm_body.rm_nochunks.rm_empty[0] = xdr_zero;
+ headerp->rm_body.rm_nochunks.rm_empty[1] = xdr_zero;
+ headerp->rm_body.rm_nochunks.rm_empty[2] = xdr_zero;
+ /* new length after pullup */
+ rpclen = rqst->rq_svec[0].iov_len;
+ } else if (rtype == rpcrdma_readch)
+ rpclen += rpcrdma_tail_pullup(&rqst->rq_snd_buf);
if (rtype != rpcrdma_noch) {
hdrlen = rpcrdma_create_chunks(rqst, &rqst->rq_snd_buf,
headerp, rtype);
@@ -521,9 +539,9 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
if (hdrlen < 0)
return hdrlen;
- dprintk("RPC: %s: %s: hdrlen %zd rpclen %zd padlen %zd"
+ dprintk("RPC: %s: %s: hdrlen %zd rpclen %zd"
" headerp 0x%p base 0x%p lkey 0x%x\n",
- __func__, transfertypes[wtype], hdrlen, rpclen, padlen,
+ __func__, transfertypes[wtype], hdrlen, rpclen,
headerp, base, rdmab_lkey(req->rl_rdmabuf));
/*
@@ -537,26 +555,15 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
req->rl_send_iov[0].length = hdrlen;
req->rl_send_iov[0].lkey = rdmab_lkey(req->rl_rdmabuf);
+ req->rl_niovs = 1;
+ if (rtype == rpcrdma_areadch)
+ return 0;
+
req->rl_send_iov[1].addr = rdmab_addr(req->rl_sendbuf);
req->rl_send_iov[1].length = rpclen;
req->rl_send_iov[1].lkey = rdmab_lkey(req->rl_sendbuf);
req->rl_niovs = 2;
-
- if (padlen) {
- struct rpcrdma_ep *ep = &r_xprt->rx_ep;
-
- req->rl_send_iov[2].addr = rdmab_addr(ep->rep_padbuf);
- req->rl_send_iov[2].length = padlen;
- req->rl_send_iov[2].lkey = rdmab_lkey(ep->rep_padbuf);
-
- req->rl_send_iov[3].addr = req->rl_send_iov[1].addr + rpclen;
- req->rl_send_iov[3].length = rqst->rq_slen - rpclen;
- req->rl_send_iov[3].lkey = rdmab_lkey(req->rl_sendbuf);
-
- req->rl_niovs = 4;
- }
-
return 0;
}
@@ -709,6 +716,37 @@ rpcrdma_connect_worker(struct work_struct *work)
spin_unlock_bh(&xprt->transport_lock);
}
+#if defined(CONFIG_SUNRPC_BACKCHANNEL)
+/* By convention, backchannel calls arrive via rdma_msg type
+ * messages, and never populate the chunk lists. This makes
+ * the RPC/RDMA header small and fixed in size, so it is
+ * straightforward to check the RPC header's direction field.
+ */
+static bool
+rpcrdma_is_bcall(struct rpcrdma_msg *headerp)
+{
+ __be32 *p = (__be32 *)headerp;
+
+ if (headerp->rm_type != rdma_msg)
+ return false;
+ if (headerp->rm_body.rm_chunks[0] != xdr_zero)
+ return false;
+ if (headerp->rm_body.rm_chunks[1] != xdr_zero)
+ return false;
+ if (headerp->rm_body.rm_chunks[2] != xdr_zero)
+ return false;
+
+ /* sanity */
+ if (p[7] != headerp->rm_xid)
+ return false;
+ /* call direction */
+ if (p[8] != cpu_to_be32(RPC_CALL))
+ return false;
+
+ return true;
+}
+#endif /* CONFIG_SUNRPC_BACKCHANNEL */
+
/*
* This function is called when an async event is posted to
* the connection which changes the connection state. All it
@@ -721,8 +759,8 @@ rpcrdma_conn_func(struct rpcrdma_ep *ep)
schedule_delayed_work(&ep->rep_connect_worker, 0);
}
-/*
- * Called as a tasklet to do req/reply match and complete a request
+/* Process received RPC/RDMA messages.
+ *
* Errors must result in the RPC task either being awakened, or
* allowed to timeout, to discover the errors at that time.
*/
@@ -732,60 +770,39 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep)
struct rpcrdma_msg *headerp;
struct rpcrdma_req *req;
struct rpc_rqst *rqst;
- struct rpc_xprt *xprt = rep->rr_xprt;
- struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
+ struct rpcrdma_xprt *r_xprt = rep->rr_rxprt;
+ struct rpc_xprt *xprt = &r_xprt->rx_xprt;
__be32 *iptr;
int rdmalen, status;
unsigned long cwnd;
u32 credits;
- /* Check status. If bad, signal disconnect and return rep to pool */
- if (rep->rr_len == ~0U) {
- rpcrdma_recv_buffer_put(rep);
- if (r_xprt->rx_ep.rep_connected == 1) {
- r_xprt->rx_ep.rep_connected = -EIO;
- rpcrdma_conn_func(&r_xprt->rx_ep);
- }
- return;
- }
- if (rep->rr_len < RPCRDMA_HDRLEN_MIN) {
- dprintk("RPC: %s: short/invalid reply\n", __func__);
- goto repost;
- }
+ dprintk("RPC: %s: incoming rep %p\n", __func__, rep);
+
+ if (rep->rr_len == RPCRDMA_BAD_LEN)
+ goto out_badstatus;
+ if (rep->rr_len < RPCRDMA_HDRLEN_MIN)
+ goto out_shortreply;
+
headerp = rdmab_to_msg(rep->rr_rdmabuf);
- if (headerp->rm_vers != rpcrdma_version) {
- dprintk("RPC: %s: invalid version %d\n",
- __func__, be32_to_cpu(headerp->rm_vers));
- goto repost;
- }
+ if (headerp->rm_vers != rpcrdma_version)
+ goto out_badversion;
+#if defined(CONFIG_SUNRPC_BACKCHANNEL)
+ if (rpcrdma_is_bcall(headerp))
+ goto out_bcall;
+#endif
- /* Get XID and try for a match. */
- spin_lock(&xprt->transport_lock);
+ /* Match incoming rpcrdma_rep to an rpcrdma_req to
+ * get context for handling any incoming chunks.
+ */
+ spin_lock_bh(&xprt->transport_lock);
rqst = xprt_lookup_rqst(xprt, headerp->rm_xid);
- if (rqst == NULL) {
- spin_unlock(&xprt->transport_lock);
- dprintk("RPC: %s: reply 0x%p failed "
- "to match any request xid 0x%08x len %d\n",
- __func__, rep, be32_to_cpu(headerp->rm_xid),
- rep->rr_len);
-repost:
- r_xprt->rx_stats.bad_reply_count++;
- rep->rr_func = rpcrdma_reply_handler;
- if (rpcrdma_ep_post_recv(&r_xprt->rx_ia, &r_xprt->rx_ep, rep))
- rpcrdma_recv_buffer_put(rep);
-
- return;
- }
+ if (!rqst)
+ goto out_nomatch;
- /* get request object */
req = rpcr_to_rdmar(rqst);
- if (req->rl_reply) {
- spin_unlock(&xprt->transport_lock);
- dprintk("RPC: %s: duplicate reply 0x%p to RPC "
- "request 0x%p: xid 0x%08x\n", __func__, rep, req,
- be32_to_cpu(headerp->rm_xid));
- goto repost;
- }
+ if (req->rl_reply)
+ goto out_duplicate;
dprintk("RPC: %s: reply 0x%p completes request 0x%p\n"
" RPC request 0x%p xid 0x%08x\n",
@@ -882,8 +899,50 @@ badheader:
if (xprt->cwnd > cwnd)
xprt_release_rqst_cong(rqst->rq_task);
+ xprt_complete_rqst(rqst->rq_task, status);
+ spin_unlock_bh(&xprt->transport_lock);
dprintk("RPC: %s: xprt_complete_rqst(0x%p, 0x%p, %d)\n",
__func__, xprt, rqst, status);
- xprt_complete_rqst(rqst->rq_task, status);
- spin_unlock(&xprt->transport_lock);
+ return;
+
+out_badstatus:
+ rpcrdma_recv_buffer_put(rep);
+ if (r_xprt->rx_ep.rep_connected == 1) {
+ r_xprt->rx_ep.rep_connected = -EIO;
+ rpcrdma_conn_func(&r_xprt->rx_ep);
+ }
+ return;
+
+#if defined(CONFIG_SUNRPC_BACKCHANNEL)
+out_bcall:
+ rpcrdma_bc_receive_call(r_xprt, rep);
+ return;
+#endif
+
+out_shortreply:
+ dprintk("RPC: %s: short/invalid reply\n", __func__);
+ goto repost;
+
+out_badversion:
+ dprintk("RPC: %s: invalid version %d\n",
+ __func__, be32_to_cpu(headerp->rm_vers));
+ goto repost;
+
+out_nomatch:
+ spin_unlock_bh(&xprt->transport_lock);
+ dprintk("RPC: %s: no match for incoming xid 0x%08x len %d\n",
+ __func__, be32_to_cpu(headerp->rm_xid),
+ rep->rr_len);
+ goto repost;
+
+out_duplicate:
+ spin_unlock_bh(&xprt->transport_lock);
+ dprintk("RPC: %s: "
+ "duplicate reply %p to RPC request %p: xid 0x%08x\n",
+ __func__, rep, req, be32_to_cpu(headerp->rm_xid));
+
+repost:
+ r_xprt->rx_stats.bad_reply_count++;
+ if (rpcrdma_ep_post_recv(&r_xprt->rx_ia, &r_xprt->rx_ep, rep))
+ rpcrdma_recv_buffer_put(rep);
}