summaryrefslogtreecommitdiffstats
path: root/kernel/net/rds/ib_recv.c
diff options
context:
space:
mode:
Diffstat (limited to 'kernel/net/rds/ib_recv.c')
-rw-r--r--kernel/net/rds/ib_recv.c198
1 files changed, 95 insertions, 103 deletions
diff --git a/kernel/net/rds/ib_recv.c b/kernel/net/rds/ib_recv.c
index 1b981a4e4..977fb8606 100644
--- a/kernel/net/rds/ib_recv.c
+++ b/kernel/net/rds/ib_recv.c
@@ -62,12 +62,12 @@ void rds_ib_recv_init_ring(struct rds_ib_connection *ic)
sge = &recv->r_sge[0];
sge->addr = ic->i_recv_hdrs_dma + (i * sizeof(struct rds_header));
sge->length = sizeof(struct rds_header);
- sge->lkey = ic->i_mr->lkey;
+ sge->lkey = ic->i_pd->local_dma_lkey;
sge = &recv->r_sge[1];
sge->addr = 0;
sge->length = RDS_FRAG_SIZE;
- sge->lkey = ic->i_mr->lkey;
+ sge->lkey = ic->i_pd->local_dma_lkey;
}
}
@@ -297,7 +297,7 @@ static struct rds_page_frag *rds_ib_refill_one_frag(struct rds_ib_connection *ic
}
static int rds_ib_recv_refill_one(struct rds_connection *conn,
- struct rds_ib_recv_work *recv, int prefill)
+ struct rds_ib_recv_work *recv, gfp_t gfp)
{
struct rds_ib_connection *ic = conn->c_transport_data;
struct ib_sge *sge;
@@ -305,7 +305,7 @@ static int rds_ib_recv_refill_one(struct rds_connection *conn,
gfp_t slab_mask = GFP_NOWAIT;
gfp_t page_mask = GFP_NOWAIT;
- if (prefill) {
+ if (gfp & __GFP_DIRECT_RECLAIM) {
slab_mask = GFP_KERNEL;
page_mask = GFP_HIGHUSER;
}
@@ -347,6 +347,24 @@ out:
return ret;
}
+static int acquire_refill(struct rds_connection *conn)
+{
+ return test_and_set_bit(RDS_RECV_REFILL, &conn->c_flags) == 0;
+}
+
+static void release_refill(struct rds_connection *conn)
+{
+ clear_bit(RDS_RECV_REFILL, &conn->c_flags);
+
+ /* We don't use wait_on_bit()/wake_up_bit() because our waking is in a
+ * hot path and finding waiters is very rare. We don't want to walk
+ * the system-wide hashed waitqueue buckets in the fast path only to
+ * almost never find waiters.
+ */
+ if (waitqueue_active(&conn->c_waitq))
+ wake_up_all(&conn->c_waitq);
+}
+
/*
* This tries to allocate and post unused work requests after making sure that
* they have all the allocations they need to queue received fragments into
@@ -354,15 +372,23 @@ out:
*
* -1 is returned if posting fails due to temporary resource exhaustion.
*/
-void rds_ib_recv_refill(struct rds_connection *conn, int prefill)
+void rds_ib_recv_refill(struct rds_connection *conn, int prefill, gfp_t gfp)
{
struct rds_ib_connection *ic = conn->c_transport_data;
struct rds_ib_recv_work *recv;
struct ib_recv_wr *failed_wr;
unsigned int posted = 0;
int ret = 0;
+ bool can_wait = !!(gfp & __GFP_DIRECT_RECLAIM);
u32 pos;
+ /* the goal here is to just make sure that someone, somewhere
+ * is posting buffers. If we can't get the refill lock,
+ * let them do their thing
+ */
+ if (!acquire_refill(conn))
+ return;
+
while ((prefill || rds_conn_up(conn)) &&
rds_ib_ring_alloc(&ic->i_recv_ring, 1, &pos)) {
if (pos >= ic->i_recv_ring.w_nr) {
@@ -372,7 +398,7 @@ void rds_ib_recv_refill(struct rds_connection *conn, int prefill)
}
recv = &ic->i_recvs[pos];
- ret = rds_ib_recv_refill_one(conn, recv, prefill);
+ ret = rds_ib_recv_refill_one(conn, recv, gfp);
if (ret) {
break;
}
@@ -402,6 +428,24 @@ void rds_ib_recv_refill(struct rds_connection *conn, int prefill)
if (ret)
rds_ib_ring_unalloc(&ic->i_recv_ring, 1);
+
+ release_refill(conn);
+
+ /* if we're called from the softirq handler, we'll be GFP_NOWAIT.
+ * in this case the ring being low is going to lead to more interrupts
+ * and we can safely let the softirq code take care of it unless the
+ * ring is completely empty.
+ *
+ * if we're called from krdsd, we'll be GFP_KERNEL. In this case
+ * we might have raced with the softirq code while we had the refill
+ * lock held. Use rds_ib_ring_low() instead of ring_empty to decide
+ * if we should requeue.
+ */
+ if (rds_conn_up(conn) &&
+ ((can_wait && rds_ib_ring_low(&ic->i_recv_ring)) ||
+ rds_ib_ring_empty(&ic->i_recv_ring))) {
+ queue_delayed_work(rds_wq, &conn->c_recv_w, 1);
+ }
}
/*
@@ -520,7 +564,7 @@ void rds_ib_recv_init_ack(struct rds_ib_connection *ic)
sge->addr = ic->i_ack_dma;
sge->length = sizeof(struct rds_header);
- sge->lkey = ic->i_mr->lkey;
+ sge->lkey = ic->i_pd->local_dma_lkey;
wr->sg_list = sge;
wr->num_sge = 1;
@@ -552,8 +596,7 @@ void rds_ib_recv_init_ack(struct rds_ib_connection *ic)
* wr_id and avoids working with the ring in that case.
*/
#ifndef KERNEL_HAS_ATOMIC64
-static void rds_ib_set_ack(struct rds_ib_connection *ic, u64 seq,
- int ack_required)
+void rds_ib_set_ack(struct rds_ib_connection *ic, u64 seq, int ack_required)
{
unsigned long flags;
@@ -578,8 +621,7 @@ static u64 rds_ib_get_ack(struct rds_ib_connection *ic)
return seq;
}
#else
-static void rds_ib_set_ack(struct rds_ib_connection *ic, u64 seq,
- int ack_required)
+void rds_ib_set_ack(struct rds_ib_connection *ic, u64 seq, int ack_required)
{
atomic64_set(&ic->i_ack_next, seq);
if (ack_required) {
@@ -786,20 +828,6 @@ static void rds_ib_cong_recv(struct rds_connection *conn,
rds_cong_map_updated(map, uncongested);
}
-/*
- * Rings are posted with all the allocations they'll need to queue the
- * incoming message to the receiving socket so this can't fail.
- * All fragments start with a header, so we can make sure we're not receiving
- * garbage, and we can tell a small 8 byte fragment from an ACK frame.
- */
-struct rds_ib_ack_state {
- u64 ack_next;
- u64 ack_recv;
- unsigned int ack_required:1;
- unsigned int ack_next_valid:1;
- unsigned int ack_recv_valid:1;
-};
-
static void rds_ib_process_recv(struct rds_connection *conn,
struct rds_ib_recv_work *recv, u32 data_len,
struct rds_ib_ack_state *state)
@@ -925,89 +953,50 @@ static void rds_ib_process_recv(struct rds_connection *conn,
}
}
-/*
- * Plucking the oldest entry from the ring can be done concurrently with
- * the thread refilling the ring. Each ring operation is protected by
- * spinlocks and the transient state of refilling doesn't change the
- * recording of which entry is oldest.
- *
- * This relies on IB only calling one cq comp_handler for each cq so that
- * there will only be one caller of rds_recv_incoming() per RDS connection.
- */
-void rds_ib_recv_cq_comp_handler(struct ib_cq *cq, void *context)
-{
- struct rds_connection *conn = context;
- struct rds_ib_connection *ic = conn->c_transport_data;
-
- rdsdebug("conn %p cq %p\n", conn, cq);
-
- rds_ib_stats_inc(s_ib_rx_cq_call);
-
- tasklet_schedule(&ic->i_recv_tasklet);
-}
-
-static inline void rds_poll_cq(struct rds_ib_connection *ic,
- struct rds_ib_ack_state *state)
+void rds_ib_recv_cqe_handler(struct rds_ib_connection *ic,
+ struct ib_wc *wc,
+ struct rds_ib_ack_state *state)
{
struct rds_connection *conn = ic->conn;
- struct ib_wc wc;
struct rds_ib_recv_work *recv;
- while (ib_poll_cq(ic->i_recv_cq, 1, &wc) > 0) {
- rdsdebug("wc wr_id 0x%llx status %u (%s) byte_len %u imm_data %u\n",
- (unsigned long long)wc.wr_id, wc.status,
- rds_ib_wc_status_str(wc.status), wc.byte_len,
- be32_to_cpu(wc.ex.imm_data));
- rds_ib_stats_inc(s_ib_rx_cq_event);
-
- recv = &ic->i_recvs[rds_ib_ring_oldest(&ic->i_recv_ring)];
+ rdsdebug("wc wr_id 0x%llx status %u (%s) byte_len %u imm_data %u\n",
+ (unsigned long long)wc->wr_id, wc->status,
+ ib_wc_status_msg(wc->status), wc->byte_len,
+ be32_to_cpu(wc->ex.imm_data));
- ib_dma_unmap_sg(ic->i_cm_id->device, &recv->r_frag->f_sg, 1, DMA_FROM_DEVICE);
-
- /*
- * Also process recvs in connecting state because it is possible
- * to get a recv completion _before_ the rdmacm ESTABLISHED
- * event is processed.
- */
- if (wc.status == IB_WC_SUCCESS) {
- rds_ib_process_recv(conn, recv, wc.byte_len, state);
- } else {
- /* We expect errors as the qp is drained during shutdown */
- if (rds_conn_up(conn) || rds_conn_connecting(conn))
- rds_ib_conn_error(conn, "recv completion on %pI4 had "
- "status %u (%s), disconnecting and "
- "reconnecting\n", &conn->c_faddr,
- wc.status,
- rds_ib_wc_status_str(wc.status));
- }
+ rds_ib_stats_inc(s_ib_rx_cq_event);
+ recv = &ic->i_recvs[rds_ib_ring_oldest(&ic->i_recv_ring)];
+ ib_dma_unmap_sg(ic->i_cm_id->device, &recv->r_frag->f_sg, 1,
+ DMA_FROM_DEVICE);
- /*
- * It's very important that we only free this ring entry if we've truly
- * freed the resources allocated to the entry. The refilling path can
- * leak if we don't.
- */
- rds_ib_ring_free(&ic->i_recv_ring, 1);
+ /* Also process recvs in connecting state because it is possible
+ * to get a recv completion _before_ the rdmacm ESTABLISHED
+ * event is processed.
+ */
+ if (wc->status == IB_WC_SUCCESS) {
+ rds_ib_process_recv(conn, recv, wc->byte_len, state);
+ } else {
+ /* We expect errors as the qp is drained during shutdown */
+ if (rds_conn_up(conn) || rds_conn_connecting(conn))
+ rds_ib_conn_error(conn, "recv completion on %pI4 had status %u (%s), disconnecting and reconnecting\n",
+ &conn->c_faddr,
+ wc->status,
+ ib_wc_status_msg(wc->status));
}
-}
-
-void rds_ib_recv_tasklet_fn(unsigned long data)
-{
- struct rds_ib_connection *ic = (struct rds_ib_connection *) data;
- struct rds_connection *conn = ic->conn;
- struct rds_ib_ack_state state = { 0, };
-
- rds_poll_cq(ic, &state);
- ib_req_notify_cq(ic->i_recv_cq, IB_CQ_SOLICITED);
- rds_poll_cq(ic, &state);
- if (state.ack_next_valid)
- rds_ib_set_ack(ic, state.ack_next, state.ack_required);
- if (state.ack_recv_valid && state.ack_recv > ic->i_ack_recv) {
- rds_send_drop_acked(conn, state.ack_recv, NULL);
- ic->i_ack_recv = state.ack_recv;
+ /* rds_ib_process_recv() doesn't always consume the frag, and
+ * we might not have called it at all if the wc didn't indicate
+ * success. We already unmapped the frag's pages, though, and
+ * the following rds_ib_ring_free() call tells the refill path
+ * that it will not find an allocated frag here. Make sure we
+ * keep that promise by freeing a frag that's still on the ring.
+ */
+ if (recv->r_frag) {
+ rds_ib_frag_free(ic, recv->r_frag);
+ recv->r_frag = NULL;
}
- if (rds_conn_up(conn))
- rds_ib_attempt_ack(ic);
+ rds_ib_ring_free(&ic->i_recv_ring, 1);
/* If we ever end up with a really empty receive ring, we're
* in deep trouble, as the sender will definitely see RNR
@@ -1016,7 +1005,7 @@ void rds_ib_recv_tasklet_fn(unsigned long data)
rds_ib_stats_inc(s_ib_rx_ring_empty);
if (rds_ib_ring_low(&ic->i_recv_ring))
- rds_ib_recv_refill(conn, 0);
+ rds_ib_recv_refill(conn, 0, GFP_NOWAIT);
}
int rds_ib_recv(struct rds_connection *conn)
@@ -1025,8 +1014,10 @@ int rds_ib_recv(struct rds_connection *conn)
int ret = 0;
rdsdebug("conn %p\n", conn);
- if (rds_conn_up(conn))
+ if (rds_conn_up(conn)) {
rds_ib_attempt_ack(ic);
+ rds_ib_recv_refill(conn, 0, GFP_KERNEL);
+ }
return ret;
}
@@ -1049,9 +1040,10 @@ int rds_ib_recv_init(void)
rds_ib_frag_slab = kmem_cache_create("rds_ib_frag",
sizeof(struct rds_page_frag),
0, SLAB_HWCACHE_ALIGN, NULL);
- if (!rds_ib_frag_slab)
+ if (!rds_ib_frag_slab) {
kmem_cache_destroy(rds_ib_incoming_slab);
- else
+ rds_ib_incoming_slab = NULL;
+ } else
ret = 0;
out:
return ret;