Skip site navigation (1)Skip section navigation (2)
Date:      Mon, 5 May 2025 19:56:24 GMT
From:      Gleb Smirnoff <glebius@FreeBSD.org>
To:        src-committers@FreeBSD.org, dev-commits-src-all@FreeBSD.org, dev-commits-src-main@FreeBSD.org
Subject:   git: d15792780760 - main - unix: new implementation of unix/stream & unix/seqpacket
Message-ID:  <202505051956.545JuOPR085707@gitrepo.freebsd.org>

next in thread | raw e-mail | index | archive | help
The branch main has been updated by glebius:

URL: https://cgit.FreeBSD.org/src/commit/?id=d15792780760ef94647af9b377b5f0a80e1826bc

commit d15792780760ef94647af9b377b5f0a80e1826bc
Author:     Gleb Smirnoff <glebius@FreeBSD.org>
AuthorDate: 2025-05-05 19:56:04 +0000
Commit:     Gleb Smirnoff <glebius@FreeBSD.org>
CommitDate: 2025-05-05 19:56:04 +0000

    unix: new implementation of unix/stream & unix/seqpacket
    
    [this is an updated version of d80a97def9a1, that had been reverted]
    
    Provide protocol specific pr_sosend and pr_soreceive for PF_UNIX
    SOCK_STREAM sockets and implement SOCK_SEQPACKET sockets as an extension
    of SOCK_STREAM.  The change meets three goals: get rid of unix(4) specific
    stuff in the generic socket code, provide a faster and robust unix/stream
    sockets and bring unix/seqpacket much closer to specification.  Highlights
    follow:
    
    - The send buffer now is truly bypassed.  Previously it was always empty,
    but the send(2) still needed to acquire its lock and do a variety of
    tricks to be woken up in the right time while sleeping on it.  Now the
    only two things we care about in the send buffer is the I/O sx(9) lock
    that serializes operations and value of so_snd.sb_hiwat, which we can read
    without obtaining a lock.  The sleep of a send(2) happens on the mutex of
    the receive buffer of the peer.  A bulk send/recv of data with large
    socket buffers will make both syscalls just bounce between owning the
    receive buffer lock and copyin(9)/copyout(9), no other locks would be
    involved.  Since event notification mechanisms, such as select(2), poll(2)
    and kevent(2) use state of the send buffer to monitor writability, the new
    implementation provides protocol specific pr_sopoll and pr_kqfilter.  The
    sendfile(2) over unix/stream is preserved, providing protocol specific
    pr_send and pr_sendfile_wait methods.
    
    - The implementation uses new mchain structure to manipulate mbuf chains.
    Note that this required converting to mchain two functions that are shared
    with unix/dgram: unp_internalize() and unp_addsockcred() as well as adding
    a new shared one uipc_process_kernel_mbuf().  This induces some non-
    functional changes in the unix/dgram code as well.  There is a space for
    improvement here, as right now it is a mix of mchain and manually managed
    mbuf chains.
    
    - unix/seqpacket previously marked as PR_ADDR & PR_ATOMIC and thus treated
    as a datagram socket by the generic socket code, now becomes a true stream
    socket with record markers.
    
    - Note on aio(4).  First problem with socket aio(4) is that it uses socket
    buffer locks for queueing and piggybacking on this locking it calls
    soreadable() and sowriteable() directly.  Ideally it should use
    pr_sopoll() method.  Second problem is that unlike a syscall, aio(4) wants
    a consistent uio structure upon return.  This is incompatible with our
    speculative read optimization, so in case of aio(4) write we need to
    restore consistency of uio.   At this point we workaround those problems
    on the side of unix(4), but ideally those workarounds should be socket
    aio(4) problem (not a first class citizen) rather than problem of unix(4),
    definitely a primary facility.
---
 sys/kern/uipc_usrreq.c | 1722 +++++++++++++++++++++++++++++++++++++-----------
 sys/sys/sockbuf.h      |   12 +
 2 files changed, 1343 insertions(+), 391 deletions(-)

diff --git a/sys/kern/uipc_usrreq.c b/sys/kern/uipc_usrreq.c
index 79e4da5c8698..06b3317dc775 100644
--- a/sys/kern/uipc_usrreq.c
+++ b/sys/kern/uipc_usrreq.c
@@ -5,7 +5,7 @@
  *	The Regents of the University of California. All Rights Reserved.
  * Copyright (c) 2004-2009 Robert N. M. Watson All Rights Reserved.
  * Copyright (c) 2018 Matthew Macy
- * Copyright (c) 2022 Gleb Smirnoff <glebius@FreeBSD.org>
+ * Copyright (c) 2022-2025 Gleb Smirnoff <glebius@FreeBSD.org>
  *
  * Redistribution and use in source and binary forms, with or without
  * modification, are permitted provided that the following conditions
@@ -73,6 +73,7 @@
 #include <sys/mount.h>
 #include <sys/mutex.h>
 #include <sys/namei.h>
+#include <sys/poll.h>
 #include <sys/proc.h>
 #include <sys/protosw.h>
 #include <sys/queue.h>
@@ -142,11 +143,14 @@ static struct timeout_task unp_gc_task;
 static struct task	unp_defer_task;
 
 /*
- * Both send and receive buffers are allocated PIPSIZ bytes of buffering for
- * stream sockets, although the total for sender and receiver is actually
- * only PIPSIZ.
+ * SOCK_STREAM and SOCK_SEQPACKET unix(4) sockets fully bypass the send buffer,
+ * however the notion of send buffer still makes sense with them.  Its size is
+ * the amount of space that a send(2) syscall may copyin(9) before checking
+ * with the receive buffer of a peer.  Although not linked anywhere yet,
+ * pointed to by a stack variable, effectively it is a buffer that needs to be
+ * sized.
  *
- * Datagram sockets really use the sendspace as the maximum datagram size,
+ * SOCK_DGRAM sockets really use the sendspace as the maximum datagram size,
  * and don't really want to reserve the sendspace.  Their recvspace should be
  * large enough for at least one max-size datagram plus address.
  */
@@ -157,7 +161,7 @@ static u_long	unpst_sendspace = PIPSIZ;
 static u_long	unpst_recvspace = PIPSIZ;
 static u_long	unpdg_maxdgram = 8*1024;	/* support 8KB syslog msgs */
 static u_long	unpdg_recvspace = 16*1024;
-static u_long	unpsp_sendspace = PIPSIZ;	/* really max datagram size */
+static u_long	unpsp_sendspace = PIPSIZ;
 static u_long	unpsp_recvspace = PIPSIZ;
 
 static SYSCTL_NODE(_net, PF_LOCAL, local, CTLFLAG_RW | CTLFLAG_MPSAFE, 0,
@@ -292,7 +296,7 @@ static int	unp_connect(struct socket *, struct sockaddr *,
 		    struct thread *);
 static int	unp_connectat(int, struct socket *, struct sockaddr *,
 		    struct thread *, bool);
-static void	unp_connect2(struct socket *so, struct socket *so2);
+static void	unp_connect2(struct socket *, struct socket *, bool);
 static void	unp_disconnect(struct unpcb *unp, struct unpcb *unp2);
 static void	unp_dispose(struct socket *so);
 static void	unp_shutdown(struct unpcb *);
@@ -301,15 +305,18 @@ static void	unp_gc(__unused void *, int);
 static void	unp_scan(struct mbuf *, void (*)(struct filedescent **, int));
 static void	unp_discard(struct file *);
 static void	unp_freerights(struct filedescent **, int);
-static int	unp_internalize(struct mbuf **, struct thread *,
-		    struct mbuf **, u_int *, u_int *);
+static int	unp_internalize(struct mbuf *, struct mchain *,
+		    struct thread *);
 static void	unp_internalize_fp(struct file *);
 static int	unp_externalize(struct mbuf *, struct mbuf **, int);
 static int	unp_externalize_fp(struct file *);
-static struct mbuf	*unp_addsockcred(struct thread *, struct mbuf *,
-		    int, struct mbuf **, u_int *, u_int *);
+static void	unp_addsockcred(struct thread *, struct mchain *, int);
 static void	unp_process_defers(void * __unused, int);
 
+static void	uipc_wrknl_lock(void *);
+static void	uipc_wrknl_unlock(void *);
+static void	uipc_wrknl_assert_lock(void *, int);
+
 static void
 unp_pcb_hold(struct unpcb *unp)
 {
@@ -417,6 +424,41 @@ unp_pcb_lock_peer(struct unpcb *unp)
 	return (unp2);
 }
 
+/*
+ * Try to lock peer of our socket for purposes of sending data to it.
+ */
+static int
+uipc_lock_peer(struct socket *so, struct unpcb **unp2)
+{
+	struct unpcb *unp;
+	int error;
+
+	unp = sotounpcb(so);
+	UNP_PCB_LOCK(unp);
+	*unp2 = unp_pcb_lock_peer(unp);
+	if (__predict_false(so->so_error != 0)) {
+		error = so->so_error;
+		so->so_error = 0;
+		UNP_PCB_UNLOCK(unp);
+		if (*unp2 != NULL)
+			UNP_PCB_UNLOCK(*unp2);
+		return (error);
+	}
+	if (__predict_false(*unp2 == NULL)) {
+		/*
+		 * Different error code for a previously connected socket and
+		 * a never connected one.  The SS_ISDISCONNECTED is set in the
+		 * unp_soisdisconnected() and is synchronized by the pcb lock.
+		 */
+		error = so->so_state & SS_ISDISCONNECTED ? EPIPE : ENOTCONN;
+		UNP_PCB_UNLOCK(unp);
+		return (error);
+	}
+	UNP_PCB_UNLOCK(unp);
+
+	return (0);
+}
+
 static void
 uipc_abort(struct socket *so)
 {
@@ -446,11 +488,6 @@ uipc_attach(struct socket *so, int proto, struct thread *td)
 
 	KASSERT(so->so_pcb == NULL, ("uipc_attach: so_pcb != NULL"));
 	switch (so->so_type) {
-	case SOCK_STREAM:
-		sendspace = unpst_sendspace;
-		recvspace = unpst_recvspace;
-		break;
-
 	case SOCK_DGRAM:
 		STAILQ_INIT(&so->so_rcv.uxdg_mb);
 		STAILQ_INIT(&so->so_snd.uxdg_mb);
@@ -463,11 +500,27 @@ uipc_attach(struct socket *so, int proto, struct thread *td)
 		sendspace = recvspace = unpdg_recvspace;
 		break;
 
+	case SOCK_STREAM:
+		sendspace = unpst_sendspace;
+		recvspace = unpst_recvspace;
+		goto common;
+
 	case SOCK_SEQPACKET:
 		sendspace = unpsp_sendspace;
 		recvspace = unpsp_recvspace;
+common:
+		/*
+		 * XXXGL: we need to initialize the mutex with MTX_DUPOK.
+		 * Ideally, protocols that have PR_SOCKBUF should be
+		 * responsible for mutex initialization officially, and then
+		 * this uglyness with mtx_destroy(); mtx_init(); would go away.
+		 */
+		mtx_destroy(&so->so_rcv_mtx);
+		mtx_init(&so->so_rcv_mtx, "so_rcv", NULL, MTX_DEF | MTX_DUPOK);
+		knlist_init(&so->so_wrsel.si_note, so, uipc_wrknl_lock,
+		    uipc_wrknl_unlock, uipc_wrknl_assert_lock);
+		STAILQ_INIT(&so->so_rcv.uxst_mbq);
 		break;
-
 	default:
 		panic("uipc_attach");
 	}
@@ -737,7 +790,7 @@ uipc_connect2(struct socket *so1, struct socket *so2)
 	unp2 = so2->so_pcb;
 	KASSERT(unp2 != NULL, ("uipc_connect2: unp2 == NULL"));
 	unp_pcb_lock_pair(unp, unp2);
-	unp_connect2(so1, so2);
+	unp_connect2(so1, so2, false);
 	unp_pcb_unlock_pair(unp, unp2);
 
 	return (0);
@@ -820,6 +873,11 @@ uipc_detach(struct socket *so)
 		taskqueue_enqueue_timeout(taskqueue_thread, &unp_gc_task, -1);
 
 	switch (so->so_type) {
+	case SOCK_STREAM:
+	case SOCK_SEQPACKET:
+		MPASS(SOLISTENING(so) || (STAILQ_EMPTY(&so->so_rcv.uxst_mbq) &&
+		    so->so_rcv.uxst_peer == NULL));
+		break;
 	case SOCK_DGRAM:
 		/*
 		 * Everything should have been unlinked/freed by unp_dispose()
@@ -875,6 +933,12 @@ uipc_listen(struct socket *so, int backlog, struct thread *td)
 	error = solisten_proto_check(so);
 	if (error == 0) {
 		cru2xt(td, &unp->unp_peercred);
+		if (!SOLISTENING(so)) {
+			(void)chgsbsize(so->so_cred->cr_uidinfo,
+			    &so->so_snd.sb_hiwat, 0, RLIM_INFINITY);
+			(void)chgsbsize(so->so_cred->cr_uidinfo,
+			    &so->so_rcv.sb_hiwat, 0, RLIM_INFINITY);
+		}
 		solisten_proto(so, backlog);
 	}
 	SOCK_UNLOCK(so);
@@ -908,187 +972,880 @@ uipc_peeraddr(struct socket *so, struct sockaddr *ret)
 	return (0);
 }
 
+/*
+ * pr_sosend() called with mbuf instead of uio is a kernel thread.  NFS,
+ * netgraph(4) and other subsystems can call into socket code.  The
+ * function will condition the mbuf so that it can be safely put onto socket
+ * buffer and calculate its char count and mbuf count.
+ *
+ * Note: we don't support receiving control data from a kernel thread.  Our
+ * pr_sosend methods have MPASS() to check that.  This may change.
+ */
+static void
+uipc_reset_kernel_mbuf(struct mbuf *m, struct mchain *mc)
+{
+
+	M_ASSERTPKTHDR(m);
+
+	m_clrprotoflags(m);
+	m_tag_delete_chain(m, NULL);
+	m->m_pkthdr.rcvif = NULL;
+	m->m_pkthdr.flowid = 0;
+	m->m_pkthdr.csum_flags = 0;
+	m->m_pkthdr.fibnum = 0;
+	m->m_pkthdr.rsstype = 0;
+
+	mc_init_m(mc, m);
+	MPASS(m->m_pkthdr.len == mc->mc_len);
+}
+
+#ifdef SOCKBUF_DEBUG
+static inline void
+uipc_stream_sbcheck(struct sockbuf *sb)
+{
+	struct mbuf *d;
+	u_int dacc, dccc, dctl, dmbcnt;
+	bool notready = false;
+
+	dacc = dccc = dctl = dmbcnt = 0;
+	STAILQ_FOREACH(d, &sb->uxst_mbq, m_stailq) {
+		if (d == sb->uxst_fnrdy)
+			notready = true;
+		if (notready)
+			MPASS(d->m_flags & M_NOTREADY);
+		if (d->m_type == MT_CONTROL)
+			dctl += d->m_len;
+		else if (d->m_type == MT_DATA) {
+			dccc +=  d->m_len;
+			if (!notready)
+				dacc += d->m_len;
+		} else
+			MPASS(0);
+		dmbcnt += MSIZE;
+		if (d->m_flags & M_EXT)
+			dmbcnt += d->m_ext.ext_size;
+		if (d->m_stailq.stqe_next == NULL)
+			MPASS(sb->uxst_mbq.stqh_last == &d->m_stailq.stqe_next);
+	}
+	MPASS(sb->uxst_fnrdy == NULL || notready);
+	MPASS(dacc == sb->sb_acc);
+	MPASS(dccc == sb->sb_ccc);
+	MPASS(dctl == sb->sb_ctl);
+	MPASS(dmbcnt == sb->sb_mbcnt);
+	(void)STAILQ_EMPTY(&sb->uxst_mbq);
+}
+#define	UIPC_STREAM_SBCHECK(sb)	uipc_stream_sbcheck(sb)
+#else
+#define	UIPC_STREAM_SBCHECK(sb)	do {} while (0)
+#endif
+
+/*
+ * uipc_stream_sbspace() returns how much a writer can send, limited by char
+ * count or mbuf memory use, whatever ends first.
+ *
+ * An obvious and legitimate reason for a socket having more data than allowed,
+ * is lowering the limit with setsockopt(SO_RCVBUF) on already full buffer.
+ * Also, sb_mbcnt may overcommit sb_mbmax in case if previous write observed
+ * 'space < mbspace', but mchain allocated to hold 'space' bytes of data ended
+ * up with 'mc_mlen > mbspace'.  A typical scenario would be a full buffer with
+ * writer trying to push in a large write, and a slow reader, that reads just
+ * a few bytes at a time.  In that case writer will keep creating new mbufs
+ * with mc_split().  These mbufs will carry little chars, but will all point at
+ * the same cluster, thus each adding cluster size to sb_mbcnt.  This means we
+ * will count same cluster many times potentially underutilizing socket buffer.
+ * We aren't optimizing towards ineffective readers.  Classic socket buffer had
+ * the same "feature".
+ */
+static inline u_int
+uipc_stream_sbspace(struct sockbuf *sb)
+{
+	u_int space, mbspace;
+
+	if (__predict_true(sb->sb_hiwat >= sb->sb_ccc + sb->sb_ctl))
+		space = sb->sb_hiwat - sb->sb_ccc - sb->sb_ctl;
+	else
+		return (0);
+	if (__predict_true(sb->sb_mbmax >= sb->sb_mbcnt))
+		mbspace = sb->sb_mbmax - sb->sb_mbcnt;
+	else
+		return (0);
+
+	return (min(space, mbspace));
+}
+
 static int
-uipc_rcvd(struct socket *so, int flags)
+uipc_sosend_stream_or_seqpacket(struct socket *so, struct sockaddr *addr,
+    struct uio *uio0, struct mbuf *m, struct mbuf *c, int flags,
+    struct thread *td)
 {
-	struct unpcb *unp, *unp2;
+	struct unpcb *unp2;
 	struct socket *so2;
-	u_int mbcnt, sbcc;
+	struct sockbuf *sb;
+	struct uio *uio;
+	struct mchain mc, cmc;
+	size_t resid, sent;
+	bool nonblock, eor, aio;
+	int error;
 
-	unp = sotounpcb(so);
-	KASSERT(unp != NULL, ("%s: unp == NULL", __func__));
-	KASSERT(so->so_type == SOCK_STREAM || so->so_type == SOCK_SEQPACKET,
-	    ("%s: socktype %d", __func__, so->so_type));
+	MPASS((uio0 != NULL && m == NULL) || (m != NULL && uio0 == NULL));
+	MPASS(m == NULL || c == NULL);
+
+	if (__predict_false(flags & MSG_OOB))
+		return (EOPNOTSUPP);
+
+	nonblock = (so->so_state & SS_NBIO) ||
+	    (flags & (MSG_DONTWAIT | MSG_NBIO));
+	eor = flags & MSG_EOR;
+
+	mc = MCHAIN_INITIALIZER(&mc);
+	cmc = MCHAIN_INITIALIZER(&cmc);
+	sent = 0;
+	aio = false;
+
+	if (m == NULL) {
+		if (c != NULL && (error = unp_internalize(c, &cmc, td)))
+			goto out;
+		/*
+		 * This function may read more data from the uio than it would
+		 * then place on socket.  That would leave uio inconsistent
+		 * upon return.  Normally uio is allocated on the stack of the
+		 * syscall thread and we don't care about leaving it consistent.
+		 * However, aio(9) will allocate a uio as part of job and will
+		 * use it to track progress.  We detect aio(9) checking the
+		 * SB_AIO_RUNNING flag.  It is safe to check it without lock
+		 * cause it is set and cleared in the same taskqueue thread.
+		 *
+		 * This check can also produce a false positive: there is
+		 * aio(9) job and also there is a syscall we are serving now.
+		 * No sane software does that, it would leave to a mess in
+		 * the socket buffer, as aio(9) doesn't grab the I/O sx(9).
+		 * But syzkaller can create this mess.  For such false positive
+		 * our goal is just don't panic or leak memory.
+		 */
+		if (__predict_false(so->so_snd.sb_flags & SB_AIO_RUNNING)) {
+			uio = cloneuio(uio0);
+			aio = true;
+		} else {
+			uio = uio0;
+			resid = uio->uio_resid;
+		}
+		/*
+		 * Optimization for a case when our send fits into the receive
+		 * buffer - do the copyin before taking any locks, sized to our
+		 * send buffer.  Later copyins will also take into account
+		 * space in the peer's receive buffer.
+		 */
+		error = mc_uiotomc(&mc, uio, so->so_snd.sb_hiwat, 0, M_WAITOK,
+		    eor ? M_EOR : 0);
+		if (__predict_false(error))
+			goto out2;
+	} else
+		uipc_reset_kernel_mbuf(m, &mc);
+
+	error = SOCK_IO_SEND_LOCK(so, SBLOCKWAIT(flags));
+	if (error)
+		goto out2;
+
+	if (__predict_false((error = uipc_lock_peer(so, &unp2)) != 0))
+		goto out3;
+
+	if (unp2->unp_flags & UNP_WANTCRED_MASK) {
+		/*
+		 * Credentials are passed only once on SOCK_STREAM and
+		 * SOCK_SEQPACKET (LOCAL_CREDS => WANTCRED_ONESHOT), or
+		 * forever (LOCAL_CREDS_PERSISTENT => WANTCRED_ALWAYS).
+		 */
+		unp_addsockcred(td, &cmc, unp2->unp_flags);
+		unp2->unp_flags &= ~UNP_WANTCRED_ONESHOT;
+	}
 
 	/*
-	 * Adjust backpressure on sender and wakeup any waiting to write.
-	 *
-	 * The unp lock is acquired to maintain the validity of the unp_conn
-	 * pointer; no lock on unp2 is required as unp2->unp_socket will be
-	 * static as long as we don't permit unp2 to disconnect from unp,
-	 * which is prevented by the lock on unp.  We cache values from
-	 * so_rcv to avoid holding the so_rcv lock over the entire
-	 * transaction on the remote so_snd.
+	 * Cycle through the data to send and available space in the peer's
+	 * receive buffer.  Put a reference on the peer socket, so that it
+	 * doesn't get freed while we sbwait().  If peer goes away, we will
+	 * observe the SBS_CANTRCVMORE and our sorele() will finalize peer's
+	 * socket destruction.
 	 */
-	SOCKBUF_LOCK(&so->so_rcv);
-	mbcnt = so->so_rcv.sb_mbcnt;
-	sbcc = sbavail(&so->so_rcv);
-	SOCKBUF_UNLOCK(&so->so_rcv);
+	so2 = unp2->unp_socket;
+	soref(so2);
+	UNP_PCB_UNLOCK(unp2);
+	sb = &so2->so_rcv;
+	while (mc.mc_len + cmc.mc_len > 0) {
+		struct mchain mcnext = MCHAIN_INITIALIZER(&mcnext);
+		u_int space;
+
+		SOCK_RECVBUF_LOCK(so2);
+restart:
+		UIPC_STREAM_SBCHECK(sb);
+		if (__predict_false(cmc.mc_len > sb->sb_hiwat)) {
+			SOCK_RECVBUF_UNLOCK(so2);
+			error = EMSGSIZE;
+			goto out4;
+		}
+		if (__predict_false(sb->sb_state & SBS_CANTRCVMORE)) {
+			SOCK_RECVBUF_UNLOCK(so2);
+			error = EPIPE;
+			goto out4;
+		}
+		/*
+		 * Wait on the peer socket receive buffer until we have enough
+		 * space to put at least control.  The data is a stream and can
+		 * be put partially, but control is really a datagram.
+		 */
+		space = uipc_stream_sbspace(sb);
+		if (space < sb->sb_lowat || space < cmc.mc_len) {
+			if (nonblock) {
+				if (aio)
+					sb->uxst_flags |= UXST_PEER_AIO;
+				SOCK_RECVBUF_UNLOCK(so2);
+				if (aio) {
+					SOCK_SENDBUF_LOCK(so);
+					so->so_snd.sb_ccc =
+					    so->so_snd.sb_hiwat - space;
+					SOCK_SENDBUF_UNLOCK(so);
+				}
+				error = EWOULDBLOCK;
+				goto out4;
+			}
+			if ((error = sbwait(so2, SO_RCV)) != 0) {
+				SOCK_RECVBUF_UNLOCK(so2);
+				goto out4;
+			} else
+				goto restart;
+		}
+		MPASS(space >= cmc.mc_len);
+		space -= cmc.mc_len;
+		if (space == 0) {
+			/* There is space only to send control. */
+			MPASS(!STAILQ_EMPTY(&cmc.mc_q));
+			mcnext = mc;
+			mc = MCHAIN_INITIALIZER(&mc);
+		} else if (space < mc.mc_len) {
+			/* Not enough space. */
+			if (__predict_false(mc_split(&mc, &mcnext, space,
+			    M_NOWAIT) == ENOMEM)) {
+				/*
+				 * If allocation failed use M_WAITOK and merge
+				 * the chain back.  Next time mc_split() will
+				 * easily split at the same place.  Only if we
+				 * race with setsockopt(SO_RCVBUF) shrinking
+				 * sb_hiwat can this happen more than once.
+				 */
+				SOCK_RECVBUF_UNLOCK(so2);
+				(void)mc_split(&mc, &mcnext, space, M_WAITOK);
+				mc_concat(&mc, &mcnext);
+				SOCK_RECVBUF_LOCK(so2);
+				goto restart;
+			}
+			MPASS(mc.mc_len == space);
+		}
+		if (!STAILQ_EMPTY(&cmc.mc_q)) {
+			STAILQ_CONCAT(&sb->uxst_mbq, &cmc.mc_q);
+			sb->sb_ctl += cmc.mc_len;
+			sb->sb_mbcnt += cmc.mc_mlen;
+			cmc.mc_len = 0;
+		}
+		sent += mc.mc_len;
+		sb->sb_acc += mc.mc_len;
+		sb->sb_ccc += mc.mc_len;
+		sb->sb_mbcnt += mc.mc_mlen;
+		STAILQ_CONCAT(&sb->uxst_mbq, &mc.mc_q);
+		UIPC_STREAM_SBCHECK(sb);
+		space = uipc_stream_sbspace(sb);
+		sorwakeup_locked(so2);
+		if (!STAILQ_EMPTY(&mcnext.mc_q)) {
+			/*
+			 * Such assignment is unsafe in general, but it is
+			 * safe with !STAILQ_EMPTY(&mcnext.mc_q).  In C++ we
+			 * could reload = for STAILQs :)
+			 */
+			mc = mcnext;
+		} else if (uio != NULL && uio->uio_resid > 0) {
+			/*
+			 * Copyin sum of peer's receive buffer space and our
+			 * sb_hiwat, which is our virtual send buffer size.
+			 * See comment above unpst_sendspace declaration.
+			 * We are reading sb_hiwat locklessly, cause a) we
+			 * don't care about an application that does send(2)
+			 * and setsockopt(2) racing internally, and for an
+			 * application that does this in sequence we will see
+			 * the correct value cause sbsetopt() uses buffer lock
+			 * and we also have already acquired it at least once.
+			 */
+			error = mc_uiotomc(&mc, uio, space +
+			    atomic_load_int(&so->so_snd.sb_hiwat), 0, M_WAITOK,
+			    eor ? M_EOR : 0);
+			if (__predict_false(error))
+				goto out4;
+		} else
+			mc = MCHAIN_INITIALIZER(&mc);
+	}
+
+	MPASS(STAILQ_EMPTY(&mc.mc_q));
+
+	td->td_ru.ru_msgsnd++;
+out4:
+	sorele(so2);
+out3:
+	SOCK_IO_SEND_UNLOCK(so);
+out2:
+	if (aio) {
+		freeuio(uio);
+		uioadvance(uio0, sent);
+	} else if (uio != NULL)
+		uio->uio_resid = resid - sent;
+	if (!mc_empty(&cmc))
+		unp_scan(mc_first(&cmc), unp_freerights);
+out:
+	mc_freem(&mc);
+	mc_freem(&cmc);
+
+	return (error);
+}
+
+static int
+uipc_soreceive_stream_or_seqpacket(struct socket *so, struct sockaddr **psa,
+    struct uio *uio, struct mbuf **mp0, struct mbuf **controlp, int *flagsp)
+{
+	struct sockbuf *sb = &so->so_rcv;
+	struct mbuf *control, *m, *first, *last, *next;
+	u_int ctl, space, datalen, mbcnt, lastlen;
+	int error, flags;
+	bool nonblock, waitall, peek;
+
+	MPASS(mp0 == NULL);
+
+	if (psa != NULL)
+		*psa = NULL;
+	if (controlp != NULL)
+		*controlp = NULL;
+
+	flags = flagsp != NULL ? *flagsp : 0;
+	nonblock = (so->so_state & SS_NBIO) ||
+	    (flags & (MSG_DONTWAIT | MSG_NBIO));
+	peek = flags & MSG_PEEK;
+	waitall = (flags & MSG_WAITALL) && !peek;
+
 	/*
-	 * There is a benign race condition at this point.  If we're planning to
-	 * clear SB_STOP, but uipc_send is called on the connected socket at
-	 * this instant, it might add data to the sockbuf and set SB_STOP.  Then
-	 * we would erroneously clear SB_STOP below, even though the sockbuf is
-	 * full.  The race is benign because the only ill effect is to allow the
-	 * sockbuf to exceed its size limit, and the size limits are not
-	 * strictly guaranteed anyway.
+	 * This check may fail only on a socket that never went through
+	 * connect(2).  We can check this locklessly, cause: a) for a new born
+	 * socket we don't care about applications that may race internally
+	 * between connect(2) and recv(2), and b) for a dying socket if we
+	 * miss update by unp_sosidisconnected(), we would still get the check
+	 * correct.  For dying socket we would observe SBS_CANTRCVMORE later.
 	 */
+	if (__predict_false((atomic_load_short(&so->so_state) &
+	    (SS_ISCONNECTED|SS_ISDISCONNECTED)) == 0))
+		return (ENOTCONN);
+
+	error = SOCK_IO_RECV_LOCK(so, SBLOCKWAIT(flags));
+	if (__predict_false(error))
+		return (error);
+
+restart:
+	SOCK_RECVBUF_LOCK(so);
+	UIPC_STREAM_SBCHECK(sb);
+	while (sb->sb_acc < sb->sb_lowat &&
+	    (sb->sb_ctl == 0 || controlp == NULL)) {
+		if (so->so_error) {
+			error = so->so_error;
+			if (!peek)
+				so->so_error = 0;
+			SOCK_RECVBUF_UNLOCK(so);
+			SOCK_IO_RECV_UNLOCK(so);
+			return (error);
+		}
+		if (sb->sb_state & SBS_CANTRCVMORE) {
+			SOCK_RECVBUF_UNLOCK(so);
+			SOCK_IO_RECV_UNLOCK(so);
+			return (0);
+		}
+		if (nonblock) {
+			SOCK_RECVBUF_UNLOCK(so);
+			SOCK_IO_RECV_UNLOCK(so);
+			return (EWOULDBLOCK);
+		}
+		error = sbwait(so, SO_RCV);
+		if (error) {
+			SOCK_RECVBUF_UNLOCK(so);
+			SOCK_IO_RECV_UNLOCK(so);
+			return (error);
+		}
+	}
+
+	MPASS(STAILQ_FIRST(&sb->uxst_mbq));
+	MPASS(sb->sb_acc > 0 || sb->sb_ctl > 0);
+
+	mbcnt = 0;
+	ctl = 0;
+	first = STAILQ_FIRST(&sb->uxst_mbq);
+	if (first->m_type == MT_CONTROL) {
+		control = first;
+		STAILQ_FOREACH_FROM(first, &sb->uxst_mbq, m_stailq) {
+			if (first->m_type != MT_CONTROL)
+				break;
+			ctl += first->m_len;
+			mbcnt += MSIZE;
+			if (first->m_flags & M_EXT)
+				mbcnt += first->m_ext.ext_size;
+		}
+	} else
+		control = NULL;
+
+	/*
+	 * Find split point for the next copyout.  On exit from the loop:
+	 * last == NULL - socket to be flushed
+	 * last != NULL
+	 *   lastlen > last->m_len - uio to be filled, last to be adjusted
+	 *   lastlen == 0          - MT_CONTROL or M_EOR encountered
+	 */
+	space = uio->uio_resid;
+	datalen = 0;
+	for (m = first, last = NULL; m != NULL; m = STAILQ_NEXT(m, m_stailq)) {
+		if (m->m_type != MT_DATA) {
+			last = m;
+			lastlen = 0;
+			break;
+		}
+		if (space >= m->m_len) {
+			space -= m->m_len;
+			datalen += m->m_len;
+			mbcnt += MSIZE;
+			if (m->m_flags & M_EXT)
+				mbcnt += m->m_ext.ext_size;
+			if (m->m_flags & M_EOR) {
+				last = STAILQ_NEXT(m, m_stailq);
+				lastlen = 0;
+				flags |= MSG_EOR;
+				break;
+			}
+		} else {
+			datalen += space;
+			last = m;
+			lastlen = space;
+			break;
+		}
+	}
+
+	UIPC_STREAM_SBCHECK(sb);
+	if (!peek) {
+		if (last == NULL)
+			STAILQ_INIT(&sb->uxst_mbq);
+		else {
+			STAILQ_FIRST(&sb->uxst_mbq) = last;
+			MPASS(last->m_len > lastlen);
+			last->m_len -= lastlen;
+			last->m_data += lastlen;
+		}
+		MPASS(sb->sb_acc >= datalen);
+		sb->sb_acc -= datalen;
+		sb->sb_ccc -= datalen;
+		MPASS(sb->sb_ctl >= ctl);
+		sb->sb_ctl -= ctl;
+		MPASS(sb->sb_mbcnt >= mbcnt);
+		sb->sb_mbcnt -= mbcnt;
+		UIPC_STREAM_SBCHECK(sb);
+		/*
+		 * In a blocking mode peer is sleeping on our receive buffer,
+		 * and we need just wakeup(9) on it.  But to wake up various
+		 * event engines, we need to reach over to peer's selinfo.
+		 * This can be safely done as the socket buffer receive lock
+		 * is protecting us from the peer going away.
+		 */
+		if (__predict_true(sb->uxst_peer != NULL)) {
+			struct selinfo *sel = &sb->uxst_peer->so_wrsel;
+			struct unpcb *unp2;
+			bool aio;
+
+			if ((aio = sb->uxst_flags & UXST_PEER_AIO))
+				sb->uxst_flags &= ~UXST_PEER_AIO;
+			if (sb->uxst_flags & UXST_PEER_SEL) {
+				selwakeuppri(sel, PSOCK);
+				/*
+				 * XXXGL: sowakeup() does SEL_WAITING() without
+				 * locks.
+				 */
+				if (!SEL_WAITING(sel))
+					sb->uxst_flags &= ~UXST_PEER_SEL;
+			}
+			if (sb->sb_flags & SB_WAIT) {
+				sb->sb_flags &= ~SB_WAIT;
+				wakeup(&sb->sb_acc);
+			}
+			KNOTE_LOCKED(&sel->si_note, 0);
+			SOCK_RECVBUF_UNLOCK(so);
+			/*
+			 * XXXGL: need to go through uipc_lock_peer() after
+			 * the receive buffer lock dropped, it was protecting
+			 * us from unp_soisdisconnected().  The aio workarounds
+			 * should be refactored to the aio(4) side.
+			 */
+			if (aio && uipc_lock_peer(so, &unp2) == 0) {
+				struct socket *so2 = unp2->unp_socket;
+
+				SOCK_SENDBUF_LOCK(so2);
+				so2->so_snd.sb_ccc -= datalen;
+				sowakeup_aio(so2, SO_SND);
+				SOCK_SENDBUF_UNLOCK(so2);
+				UNP_PCB_UNLOCK(unp2);
+			}
+		} else
+			SOCK_RECVBUF_UNLOCK(so);
+	} else
+		SOCK_RECVBUF_UNLOCK(so);
+
+	while (control != NULL && control->m_type == MT_CONTROL) {
+		if (!peek) {
+			struct mbuf *c;
+
+			/*
+			 * unp_externalize() failure must abort entire read(2).
+			 * Such failure should also free the problematic
+			 * control, but link back the remaining data to the head
+			 * of the buffer, so that socket is not left in a state
+			 * where it can't progress forward with reading.
+			 * Probability of such a failure is really low, so it
+			 * is fine that we need to perform pretty complex
+			 * operation here to reconstruct the buffer.
+			 * XXXGL: unp_externalize() used to be
+			 * dom_externalize() KBI and it frees whole chain, so
+			 * we need to feed it with mbufs one by one.
+			 */
+			c = control;
+			control = STAILQ_NEXT(c, m_stailq);
+			STAILQ_NEXT(c, m_stailq) = NULL;
+			error = unp_externalize(c, controlp, flags);
+			if (__predict_false(error && control != NULL)) {
+				struct mchain cmc;
+
+				mc_init_m(&cmc, control);
+
+				SOCK_RECVBUF_LOCK(so);
+				MPASS(!(sb->sb_state & SBS_CANTRCVMORE));
+
+				if (__predict_false(cmc.mc_len + sb->sb_ccc +
+				    sb->sb_ctl > sb->sb_hiwat)) {
+					/*
+					 * Too bad, while unp_externalize() was
+					 * failing, the other side had filled
+					 * the buffer and we can't prepend data
+					 * back. Losing data!
+					 */
+					SOCK_RECVBUF_UNLOCK(so);
+					SOCK_IO_RECV_UNLOCK(so);
+					unp_scan(mc_first(&cmc),
+					    unp_freerights);
+					mc_freem(&cmc);
+					return (error);
+				}
+
+				UIPC_STREAM_SBCHECK(sb);
+				/* XXXGL: STAILQ_PREPEND */
+				STAILQ_CONCAT(&cmc.mc_q, &sb->uxst_mbq);
+				STAILQ_SWAP(&cmc.mc_q, &sb->uxst_mbq, mbuf);
+
+				sb->sb_ctl = sb->sb_acc = sb->sb_ccc =
+				    sb->sb_mbcnt = 0;
+				STAILQ_FOREACH(m, &sb->uxst_mbq, m_stailq) {
+					if (m->m_type == MT_DATA) {
+						sb->sb_acc += m->m_len;
+						sb->sb_ccc += m->m_len;
+					} else {
+						sb->sb_ctl += m->m_len;
+					}
+					sb->sb_mbcnt += MSIZE;
+					if (m->m_flags & M_EXT)
+						sb->sb_mbcnt +=
+						    m->m_ext.ext_size;
+				}
+				UIPC_STREAM_SBCHECK(sb);
+				SOCK_RECVBUF_UNLOCK(so);
+				SOCK_IO_RECV_UNLOCK(so);
+				return (error);
+			}
+			if (controlp != NULL) {
+				while (*controlp != NULL)
+					controlp = &(*controlp)->m_next;
+			}
+		} else {
+			/*
+			 * XXXGL
+			 *
+			 * In MSG_PEEK case control is not externalized.  This
+			 * means we are leaking some kernel pointers to the
+			 * userland.  They are useless to a law-abiding
+			 * application, but may be useful to a malware.  This
+			 * is what the historical implementation in the
+			 * soreceive_generic() did. To be improved?
+			 */
+			if (controlp != NULL) {
+				*controlp = m_copym(control, 0, control->m_len,
+				    M_WAITOK);
+				controlp = &(*controlp)->m_next;
+			}
+			control = STAILQ_NEXT(control, m_stailq);
+		}
+	}
+
+	for (m = first; m != last; m = next) {
+		next = STAILQ_NEXT(m, m_stailq);
+		error = uiomove(mtod(m, char *), m->m_len, uio);
+		if (__predict_false(error)) {
+			SOCK_IO_RECV_UNLOCK(so);
+			if (!peek)
+				for (; m != last; m = next) {
+					next = STAILQ_NEXT(m, m_stailq);
+					m_free(m);
+				}
+			return (error);
+		}
+		if (!peek)
+			m_free(m);
+	}
+	if (last != NULL && lastlen > 0) {
+		if (!peek) {
+			MPASS(!(m->m_flags & M_PKTHDR));
+			MPASS(last->m_data - M_START(last) >= lastlen);
+			error = uiomove(mtod(last, char *) - lastlen,
+			    lastlen, uio);
+		} else
+			error = uiomove(mtod(last, char *), lastlen, uio);
+		if (__predict_false(error)) {
+			SOCK_IO_RECV_UNLOCK(so);
+			return (error);
+		}
+	}
+	if (waitall && !(flags & MSG_EOR) && uio->uio_resid > 0)
+		goto restart;
+	SOCK_IO_RECV_UNLOCK(so);
+
+	if (flagsp != NULL)
+		*flagsp |= flags;
+
+	uio->uio_td->td_ru.ru_msgrcv++;
+
+	return (0);
+}
+
+static int
+uipc_sopoll_stream_or_seqpacket(struct socket *so, int events,
+    struct thread *td)
+{
+	struct unpcb *unp = sotounpcb(so);
+	int revents;
+
 	UNP_PCB_LOCK(unp);
-	unp2 = unp->unp_conn;
-	if (unp2 == NULL) {
+	if (SOLISTENING(so)) {
+		/* The above check is safe, since conversion to listening uses
+		 * both protocol and socket lock.
+		 */
+		SOCK_LOCK(so);
+		if (!(events & (POLLIN | POLLRDNORM)))
+			revents = 0;
+		else if (!TAILQ_EMPTY(&so->sol_comp))
+			revents = events & (POLLIN | POLLRDNORM);
+		else if (so->so_error)
+			revents = (events & (POLLIN | POLLRDNORM)) | POLLHUP;
+		else {
+			selrecord(td, &so->so_rdsel);
+			revents = 0;
+		}
+		SOCK_UNLOCK(so);
+	} else {
+		if (so->so_state & SS_ISDISCONNECTED)
+			revents = POLLHUP;
+		else
+			revents = 0;
+		if (events & (POLLIN | POLLRDNORM | POLLRDHUP)) {
+			SOCK_RECVBUF_LOCK(so);
+			if (sbavail(&so->so_rcv) >= so->so_rcv.sb_lowat ||
+			    so->so_error || so->so_rerror)
+				revents |= events & (POLLIN | POLLRDNORM);
+			if (so->so_rcv.sb_state & SBS_CANTRCVMORE)
+				revents |= events & POLLRDHUP;
+			if (!(revents & (POLLIN | POLLRDNORM | POLLRDHUP))) {
+				selrecord(td, &so->so_rdsel);
*** 1285 LINES SKIPPED ***



Want to link to this message? Use this URL: <https://mail-archive.FreeBSD.org/cgi/mid.cgi?202505051956.545JuOPR085707>