diff options
Diffstat (limited to 'kernel/net/rds/send.c')
-rw-r--r-- | kernel/net/rds/send.c | 80 |
1 files changed, 56 insertions, 24 deletions
diff --git a/kernel/net/rds/send.c b/kernel/net/rds/send.c index e9430f537..c9cdb358e 100644 --- a/kernel/net/rds/send.c +++ b/kernel/net/rds/send.c @@ -38,6 +38,7 @@ #include <linux/list.h> #include <linux/ratelimit.h> #include <linux/export.h> +#include <linux/sizes.h> #include "rds.h" @@ -51,7 +52,7 @@ * it to 0 will restore the old behavior (where we looped until we had * drained the queue). */ -static int send_batch_count = 64; +static int send_batch_count = SZ_1K; module_param(send_batch_count, int, 0444); MODULE_PARM_DESC(send_batch_count, " batch factor when working the send queue"); @@ -223,7 +224,7 @@ restart: * through a lot of messages, lets back off and see * if anyone else jumps in */ - if (batch_count >= 1024) + if (batch_count >= send_batch_count) goto over_batch; spin_lock_irqsave(&conn->c_lock, flags); @@ -282,26 +283,34 @@ restart: /* The transport either sends the whole rdma or none of it */ if (rm->rdma.op_active && !conn->c_xmit_rdma_sent) { rm->m_final_op = &rm->rdma; + /* The transport owns the mapped memory for now. + * You can't unmap it while it's on the send queue + */ + set_bit(RDS_MSG_MAPPED, &rm->m_flags); ret = conn->c_trans->xmit_rdma(conn, &rm->rdma); - if (ret) + if (ret) { + clear_bit(RDS_MSG_MAPPED, &rm->m_flags); + wake_up_interruptible(&rm->m_flush_wait); break; + } conn->c_xmit_rdma_sent = 1; - /* The transport owns the mapped memory for now. - * You can't unmap it while it's on the send queue */ - set_bit(RDS_MSG_MAPPED, &rm->m_flags); } if (rm->atomic.op_active && !conn->c_xmit_atomic_sent) { rm->m_final_op = &rm->atomic; + /* The transport owns the mapped memory for now. + * You can't unmap it while it's on the send queue + */ + set_bit(RDS_MSG_MAPPED, &rm->m_flags); ret = conn->c_trans->xmit_atomic(conn, &rm->atomic); - if (ret) + if (ret) { + clear_bit(RDS_MSG_MAPPED, &rm->m_flags); + wake_up_interruptible(&rm->m_flush_wait); break; + } conn->c_xmit_atomic_sent = 1; - /* The transport owns the mapped memory for now. - * You can't unmap it while it's on the send queue */ - set_bit(RDS_MSG_MAPPED, &rm->m_flags); } /* @@ -411,15 +420,19 @@ over_batch: */ if (ret == 0) { smp_mb(); - if (!list_empty(&conn->c_send_queue) && + if ((test_bit(0, &conn->c_map_queued) || + !list_empty(&conn->c_send_queue)) && send_gen == conn->c_send_gen) { rds_stats_inc(s_send_lock_queue_raced); - goto restart; + if (batch_count < send_batch_count) + goto restart; + queue_delayed_work(rds_wq, &conn->c_send_w, 1); } } out: return ret; } +EXPORT_SYMBOL_GPL(rds_send_xmit); static void rds_send_sndbuf_remove(struct rds_sock *rs, struct rds_message *rm) { @@ -769,8 +782,22 @@ void rds_send_drop_to(struct rds_sock *rs, struct sockaddr_in *dest) while (!list_empty(&list)) { rm = list_entry(list.next, struct rds_message, m_sock_item); list_del_init(&rm->m_sock_item); - rds_message_wait(rm); + + /* just in case the code above skipped this message + * because RDS_MSG_ON_CONN wasn't set, run it again here + * taking m_rs_lock is the only thing that keeps us + * from racing with ack processing. + */ + spin_lock_irqsave(&rm->m_rs_lock, flags); + + spin_lock(&rs->rs_lock); + __rds_send_complete(rs, rm, RDS_RDMA_CANCELED); + spin_unlock(&rs->rs_lock); + + rm->m_rs = NULL; + spin_unlock_irqrestore(&rm->m_rs_lock, flags); + rds_message_put(rm); } } @@ -986,11 +1013,18 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len) release_sock(sk); } - /* racing with another thread binding seems ok here */ + lock_sock(sk); if (daddr == 0 || rs->rs_bound_addr == 0) { + release_sock(sk); ret = -ENOTCONN; /* XXX not a great errno */ goto out; } + release_sock(sk); + + if (payload_len > rds_sk_sndbuf(rs)) { + ret = -EMSGSIZE; + goto out; + } /* size of rm including all sgs */ ret = rds_rm_size(msg, payload_len); @@ -1023,7 +1057,8 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len) if (rs->rs_conn && rs->rs_conn->c_faddr == daddr) conn = rs->rs_conn; else { - conn = rds_conn_create_outgoing(rs->rs_bound_addr, daddr, + conn = rds_conn_create_outgoing(sock_net(sock->sk), + rs->rs_bound_addr, daddr, rs->rs_transport, sock->sk->sk_allocation); if (IS_ERR(conn)) { @@ -1063,11 +1098,7 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len) while (!rds_send_queue_rm(rs, conn, rm, rs->rs_bound_port, dport, &queued)) { rds_stats_inc(s_send_queue_full); - /* XXX make sure this is reasonable */ - if (payload_len > rds_sk_sndbuf(rs)) { - ret = -EMSGSIZE; - goto out; - } + if (nonblock) { ret = -EAGAIN; goto out; @@ -1095,8 +1126,9 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len) */ rds_stats_inc(s_send_queued); - if (!test_bit(RDS_LL_SEND_FULL, &conn->c_flags)) - rds_send_xmit(conn); + ret = rds_send_xmit(conn); + if (ret == -ENOMEM || ret == -EAGAIN) + queue_delayed_work(rds_wq, &conn->c_send_w, 1); rds_message_put(rm); return payload_len; @@ -1152,8 +1184,8 @@ rds_send_pong(struct rds_connection *conn, __be16 dport) rds_stats_inc(s_send_queued); rds_stats_inc(s_send_pong); - if (!test_bit(RDS_LL_SEND_FULL, &conn->c_flags)) - queue_delayed_work(rds_wq, &conn->c_send_w, 0); + /* schedule the send work on rds_wq */ + queue_delayed_work(rds_wq, &conn->c_send_w, 1); rds_message_put(rm); return 0; |