diff options
Diffstat (limited to 'kernel/ipc/msg.c')
-rw-r--r-- | kernel/ipc/msg.c | 105 |
1 files changed, 37 insertions, 68 deletions
diff --git a/kernel/ipc/msg.c b/kernel/ipc/msg.c index cedbf5f50..b8c5e7f2b 100644 --- a/kernel/ipc/msg.c +++ b/kernel/ipc/msg.c @@ -76,7 +76,7 @@ struct msg_sender { static inline struct msg_queue *msq_obtain_object(struct ipc_namespace *ns, int id) { - struct kern_ipc_perm *ipcp = ipc_obtain_object(&msg_ids(ns), id); + struct kern_ipc_perm *ipcp = ipc_obtain_object_idr(&msg_ids(ns), id); if (IS_ERR(ipcp)) return ERR_CAST(ipcp); @@ -137,13 +137,6 @@ static int newque(struct ipc_namespace *ns, struct ipc_params *params) return retval; } - /* ipc_addid() locks msq upon success. */ - id = ipc_addid(&msg_ids(ns), &msq->q_perm, ns->msg_ctlmni); - if (id < 0) { - ipc_rcu_putref(msq, msg_rcu_free); - return id; - } - msq->q_stime = msq->q_rtime = 0; msq->q_ctime = get_seconds(); msq->q_cbytes = msq->q_qnum = 0; @@ -153,6 +146,13 @@ static int newque(struct ipc_namespace *ns, struct ipc_params *params) INIT_LIST_HEAD(&msq->q_receivers); INIT_LIST_HEAD(&msq->q_senders); + /* ipc_addid() locks msq upon success. */ + id = ipc_addid(&msg_ids(ns), &msq->q_perm, ns->msg_ctlmni); + if (id < 0) { + ipc_rcu_putref(msq, msg_rcu_free); + return id; + } + ipc_unlock_object(&msq->q_perm); rcu_read_unlock(); @@ -183,29 +183,15 @@ static void ss_wakeup(struct list_head *h, int kill) } } -static void expunge_all(struct msg_queue *msq, int res) +static void expunge_all(struct msg_queue *msq, int res, + struct wake_q_head *wake_q) { struct msg_receiver *msr, *t; list_for_each_entry_safe(msr, t, &msq->q_receivers, r_list) { - /* - * Make sure that the wakeup doesnt preempt - * this CPU prematurely. (on PREEMPT_RT) - */ - preempt_disable_rt(); - msr->r_msg = NULL; /* initialize expunge ordering */ - wake_up_process(msr->r_tsk); - /* - * Ensure that the wakeup is visible before setting r_msg as - * the receiving end depends on it: either spinning on a nil, - * or dealing with -EAGAIN cases. See lockless receive part 1 - * and 2 in do_msgrcv(). - */ - smp_mb(); + wake_q_add(wake_q, msr->r_tsk); msr->r_msg = ERR_PTR(res); - - preempt_enable_rt(); } } @@ -221,11 +207,13 @@ static void freeque(struct ipc_namespace *ns, struct kern_ipc_perm *ipcp) { struct msg_msg *msg, *t; struct msg_queue *msq = container_of(ipcp, struct msg_queue, q_perm); + WAKE_Q(wake_q); - expunge_all(msq, -EIDRM); + expunge_all(msq, -EIDRM, &wake_q); ss_wakeup(&msq->q_senders, 1); msg_rmid(ns, msq); ipc_unlock_object(&msq->q_perm); + wake_up_q(&wake_q); rcu_read_unlock(); list_for_each_entry_safe(msg, t, &msq->q_messages, m_list) { @@ -350,6 +338,7 @@ static int msgctl_down(struct ipc_namespace *ns, int msqid, int cmd, struct kern_ipc_perm *ipcp; struct msqid64_ds uninitialized_var(msqid64); struct msg_queue *msq; + WAKE_Q(wake_q); int err; if (cmd == IPC_SET) { @@ -397,7 +386,7 @@ static int msgctl_down(struct ipc_namespace *ns, int msqid, int cmd, /* sleeping receivers might be excluded by * stricter permissions. */ - expunge_all(msq, -EAGAIN); + expunge_all(msq, -EAGAIN, &wake_q); /* sleeping senders might be able to send * due to a larger queue size. */ @@ -410,6 +399,7 @@ static int msgctl_down(struct ipc_namespace *ns, int msqid, int cmd, out_unlock0: ipc_unlock_object(&msq->q_perm); + wake_up_q(&wake_q); out_unlock1: rcu_read_unlock(); out_up: @@ -574,7 +564,8 @@ static int testmsg(struct msg_msg *msg, long type, int mode) return 0; } -static inline int pipelined_send(struct msg_queue *msq, struct msg_msg *msg) +static inline int pipelined_send(struct msg_queue *msq, struct msg_msg *msg, + struct wake_q_head *wake_q) { struct msg_receiver *msr, *t; @@ -582,39 +573,21 @@ static inline int pipelined_send(struct msg_queue *msq, struct msg_msg *msg) if (testmsg(msg, msr->r_msgtype, msr->r_mode) && !security_msg_queue_msgrcv(msq, msg, msr->r_tsk, msr->r_msgtype, msr->r_mode)) { - /* - * Make sure that the wakeup doesnt preempt - * this CPU prematurely. (on PREEMPT_RT) - */ - preempt_disable_rt(); list_del(&msr->r_list); if (msr->r_maxsize < msg->m_ts) { - /* initialize pipelined send ordering */ - msr->r_msg = NULL; - wake_up_process(msr->r_tsk); - smp_mb(); /* see barrier comment below */ + wake_q_add(wake_q, msr->r_tsk); msr->r_msg = ERR_PTR(-E2BIG); } else { - msr->r_msg = NULL; msq->q_lrpid = task_pid_vnr(msr->r_tsk); msq->q_rtime = get_seconds(); - wake_up_process(msr->r_tsk); - /* - * Ensure that the wakeup is visible before - * setting r_msg, as the receiving end depends - * on it. See lockless receive part 1 and 2 in - * do_msgrcv(). - */ - smp_mb(); + wake_q_add(wake_q, msr->r_tsk); msr->r_msg = msg; - preempt_enable_rt(); - return 1; } - preempt_enable_rt(); } } + return 0; } @@ -625,6 +598,7 @@ long do_msgsnd(int msqid, long mtype, void __user *mtext, struct msg_msg *msg; int err; struct ipc_namespace *ns; + WAKE_Q(wake_q); ns = current->nsproxy->ipc_ns; @@ -710,7 +684,7 @@ long do_msgsnd(int msqid, long mtype, void __user *mtext, msq->q_lspid = task_tgid_vnr(current); msq->q_stime = get_seconds(); - if (!pipelined_send(msq, msg)) { + if (!pipelined_send(msq, msg, &wake_q)) { /* no one is waiting for this message, enqueue it */ list_add_tail(&msg->m_list, &msq->q_messages); msq->q_cbytes += msgsz; @@ -724,6 +698,7 @@ long do_msgsnd(int msqid, long mtype, void __user *mtext, out_unlock0: ipc_unlock_object(&msq->q_perm); + wake_up_q(&wake_q); out_unlock1: rcu_read_unlock(); if (msg != NULL) @@ -944,31 +919,25 @@ long do_msgrcv(int msqid, void __user *buf, size_t bufsz, long msgtyp, int msgfl rcu_read_lock(); /* Lockless receive, part 2: - * Wait until pipelined_send or expunge_all are outside of - * wake_up_process(). There is a race with exit(), see - * ipc/mqueue.c for the details. + * The work in pipelined_send() and expunge_all(): + * - Set pointer to message + * - Queue the receiver task for later wakeup + * - Wake up the process after the lock is dropped. + * + * Should the process wake up before this wakeup (due to a + * signal) it will either see the message and continue … */ - msg = (struct msg_msg *)msr_d.r_msg; - while (msg == NULL) { - cpu_relax(); - msg = (struct msg_msg *)msr_d.r_msg; - } - /* Lockless receive, part 3: - * If there is a message or an error then accept it without - * locking. - */ + msg = (struct msg_msg *)msr_d.r_msg; if (msg != ERR_PTR(-EAGAIN)) goto out_unlock1; - /* Lockless receive, part 3: - * Acquire the queue spinlock. - */ + /* + * … or see -EAGAIN, acquire the lock to check the message + * again. + */ ipc_lock_object(&msq->q_perm); - /* Lockless receive, part 4: - * Repeat test after acquiring the spinlock. - */ msg = (struct msg_msg *)msr_d.r_msg; if (msg != ERR_PTR(-EAGAIN)) goto out_unlock0; |