Date: Mon, 5 Oct 2009 14:49:16 +0000 (UTC) From: Robert Watson <rwatson@FreeBSD.org> To: src-committers@freebsd.org, svn-src-all@freebsd.org, svn-src-head@freebsd.org Subject: svn commit: r197775 - head/sys/kern Message-ID: <200910051449.n95EnGof015525@svn.freebsd.org>
next in thread | raw e-mail | index | archive | help
Author: rwatson Date: Mon Oct 5 14:49:16 2009 New Revision: 197775 URL: http://svn.freebsd.org/changeset/base/197775 Log: First cut at implementing SOCK_SEQPACKET support for UNIX (local) domain sockets. This allows for reliable bi-directional datagram communication over UNIX domain sockets, in contrast to SOCK_DGRAM (M:N, unreliable) or SOCK_STERAM (bi-directional bytestream). Largely, this reuses existing UNIX domain socket code. This allows applications requiring record- oriented semantics to do so reliably via local IPC. Some implementation notes (also present in XXX comments): - Currently we lack an sbappend variant able to do datagrams and control data without doing addresses, so we mark SOCK_SEQPACKET as PR_ADDR. Adding a new variant will solve this problem. - UNIX domain sockets on FreeBSD provide back-pressure/flow control notification for stream sockets by manipulating the send socket buffer's size during pru_send and pru_rcvd. This trick works less well for SOCK_SEQPACKET as sosend_generic() uses sb_hiwat not just to manage blocking, but also to determine maximum datagram size. Fixing this requires rethinking how back-pressure is done for SOCK_SEQPACKET; in the mean time, it's possible to get EMSGSIZE when buffers fill, instead of blocking. Discussed with: benl Reviewed by: bz, rpaulo MFC after: 3 months Sponsored by: Google Modified: head/sys/kern/uipc_usrreq.c Modified: head/sys/kern/uipc_usrreq.c ============================================================================== --- head/sys/kern/uipc_usrreq.c Mon Oct 5 14:46:56 2009 (r197774) +++ head/sys/kern/uipc_usrreq.c Mon Oct 5 14:49:16 2009 (r197775) @@ -50,7 +50,8 @@ * garbage collector to find and tear down cycles of disconnected sockets. * * TODO: - * SEQPACKET, RDM + * RDM + * distinguish datagram size limits from flow control limits in SEQPACKET * rethink name space problems * need a proper out-of-band */ @@ -112,6 +113,7 @@ static ino_t unp_ino; /* Prototype for static int unp_rights; /* (g) File descriptors in flight. */ static struct unp_head unp_shead; /* (l) List of stream sockets. */ static struct unp_head unp_dhead; /* (l) List of datagram sockets. */ +static struct unp_head unp_sphead; /* (l) List of seqpacket sockets. */ static const struct sockaddr sun_noname = { sizeof(sun_noname), AF_LOCAL }; @@ -139,10 +141,14 @@ static u_long unpst_sendspace = PIPSIZ; static u_long unpst_recvspace = PIPSIZ; static u_long unpdg_sendspace = 2*1024; /* really max datagram size */ static u_long unpdg_recvspace = 4*1024; +static u_long unpsp_sendspace = PIPSIZ; /* really max datagram size */ +static u_long unpsp_recvspace = PIPSIZ; SYSCTL_NODE(_net, PF_LOCAL, local, CTLFLAG_RW, 0, "Local domain"); SYSCTL_NODE(_net_local, SOCK_STREAM, stream, CTLFLAG_RW, 0, "SOCK_STREAM"); SYSCTL_NODE(_net_local, SOCK_DGRAM, dgram, CTLFLAG_RW, 0, "SOCK_DGRAM"); +SYSCTL_NODE(_net_local, SOCK_SEQPACKET, seqpacket, CTLFLAG_RW, 0, + "SOCK_SEQPACKET"); SYSCTL_ULONG(_net_local_stream, OID_AUTO, sendspace, CTLFLAG_RW, &unpst_sendspace, 0, "Default stream send space."); @@ -152,6 +158,10 @@ SYSCTL_ULONG(_net_local_dgram, OID_AUTO, &unpdg_sendspace, 0, "Default datagram send space."); SYSCTL_ULONG(_net_local_dgram, OID_AUTO, recvspace, CTLFLAG_RW, &unpdg_recvspace, 0, "Default datagram receive space."); +SYSCTL_ULONG(_net_local_seqpacket, OID_AUTO, maxseqpacket, CTLFLAG_RW, + &unpsp_sendspace, 0, "Default seqpacket send space."); +SYSCTL_ULONG(_net_local_seqpacket, OID_AUTO, recvspace, CTLFLAG_RW, + &unpsp_recvspace, 0, "Default seqpacket receive space."); SYSCTL_INT(_net_local, OID_AUTO, inflight, CTLFLAG_RD, &unp_rights, 0, "File descriptors in flight."); @@ -257,6 +267,7 @@ static struct mbuf *unp_addsockcred(stru */ static struct domain localdomain; static struct pr_usrreqs uipc_usrreqs_dgram, uipc_usrreqs_stream; +static struct pr_usrreqs uipc_usrreqs_seqpacket; static struct protosw localsw[] = { { .pr_type = SOCK_STREAM, @@ -271,6 +282,19 @@ static struct protosw localsw[] = { .pr_flags = PR_ATOMIC|PR_ADDR|PR_RIGHTS, .pr_usrreqs = &uipc_usrreqs_dgram }, +{ + .pr_type = SOCK_SEQPACKET, + .pr_domain = &localdomain, + + /* + * XXXRW: For now, PR_ADDR because soreceive will bump into them + * due to our use of sbappendaddr. A new sbappend variants is needed + * that supports both atomic record writes and control data. + */ + .pr_flags = PR_ADDR|PR_ATOMIC|PR_CONNREQUIRED|PR_WANTRCVD| + PR_RIGHTS, + .pr_usrreqs = &uipc_usrreqs_seqpacket, +}, }; static struct domain localdomain = { @@ -353,6 +377,11 @@ uipc_attach(struct socket *so, int proto recvspace = unpdg_recvspace; break; + case SOCK_SEQPACKET: + sendspace = unpsp_sendspace; + recvspace = unpsp_recvspace; + break; + default: panic("uipc_attach"); } @@ -372,8 +401,22 @@ uipc_attach(struct socket *so, int proto UNP_LIST_LOCK(); unp->unp_gencnt = ++unp_gencnt; unp_count++; - LIST_INSERT_HEAD(so->so_type == SOCK_DGRAM ? &unp_dhead : &unp_shead, - unp, unp_link); + switch (so->so_type) { + case SOCK_STREAM: + LIST_INSERT_HEAD(&unp_shead, unp, unp_link); + break; + + case SOCK_DGRAM: + LIST_INSERT_HEAD(&unp_dhead, unp, unp_link); + break; + + case SOCK_SEQPACKET: + LIST_INSERT_HEAD(&unp_sphead, unp, unp_link); + break; + + default: + panic("uipc_attach"); + } UNP_LIST_UNLOCK(); return (0); @@ -705,11 +748,8 @@ uipc_rcvd(struct socket *so, int flags) unp = sotounpcb(so); KASSERT(unp != NULL, ("uipc_rcvd: unp == NULL")); - if (so->so_type == SOCK_DGRAM) - panic("uipc_rcvd DGRAM?"); - - if (so->so_type != SOCK_STREAM) - panic("uipc_rcvd unknown socktype"); + if (so->so_type != SOCK_STREAM && so->so_type != SOCK_SEQPACKET) + panic("uipc_rcvd socktype %d", so->so_type); /* * Adjust backpressure on sender and wakeup any waiting to write. @@ -824,6 +864,7 @@ uipc_send(struct socket *so, int flags, break; } + case SOCK_SEQPACKET: case SOCK_STREAM: if ((so->so_state & SS_ISCONNECTED) == 0) { if (nam != NULL) { @@ -875,11 +916,33 @@ uipc_send(struct socket *so, int flags, * Send to paired receive port, and then reduce send buffer * hiwater marks to maintain backpressure. Wake up readers. */ - if (control != NULL) { - if (sbappendcontrol_locked(&so2->so_rcv, m, control)) + switch (so->so_type) { + case SOCK_STREAM: + if (control != NULL) { + if (sbappendcontrol_locked(&so2->so_rcv, m, + control)) + control = NULL; + } else + sbappend_locked(&so2->so_rcv, m); + break; + + case SOCK_SEQPACKET: { + const struct sockaddr *from; + + from = &sun_noname; + if (sbappendaddr_locked(&so2->so_rcv, from, m, + control)) control = NULL; - } else - sbappend_locked(&so2->so_rcv, m); + break; + } + } + + /* + * XXXRW: While fine for SOCK_STREAM, this conflates maximum + * datagram size and back-pressure for SOCK_SEQPACKET, which + * can lead to undesired return of EMSGSIZE on send instead + * of more desirable blocking. + */ mbcnt_delta = so2->so_rcv.sb_mbcnt - unp2->unp_mbcnt; unp2->unp_mbcnt = so2->so_rcv.sb_mbcnt; sbcc = so2->so_rcv.sb_cc; @@ -939,7 +1002,8 @@ uipc_sense(struct socket *so, struct sta UNP_LINK_RLOCK(); UNP_PCB_LOCK(unp); unp2 = unp->unp_conn; - if (so->so_type == SOCK_STREAM && unp2 != NULL) { + if ((so->so_type == SOCK_STREAM || so->so_type == SOCK_SEQPACKET) && + unp2 != NULL) { so2 = unp2->unp_socket; sb->st_blksize += so2->so_rcv.sb_cc; } @@ -1009,6 +1073,26 @@ static struct pr_usrreqs uipc_usrreqs_dg .pru_close = uipc_close, }; +static struct pr_usrreqs uipc_usrreqs_seqpacket = { + .pru_abort = uipc_abort, + .pru_accept = uipc_accept, + .pru_attach = uipc_attach, + .pru_bind = uipc_bind, + .pru_connect = uipc_connect, + .pru_connect2 = uipc_connect2, + .pru_detach = uipc_detach, + .pru_disconnect = uipc_disconnect, + .pru_listen = uipc_listen, + .pru_peeraddr = uipc_peeraddr, + .pru_rcvd = uipc_rcvd, + .pru_send = uipc_send, + .pru_sense = uipc_sense, + .pru_shutdown = uipc_shutdown, + .pru_sockaddr = uipc_sockaddr, + .pru_soreceive = soreceive_generic, /* XXX: or...? */ + .pru_close = uipc_close, +}; + static struct pr_usrreqs uipc_usrreqs_stream = { .pru_abort = uipc_abort, .pru_accept = uipc_accept, @@ -1306,6 +1390,7 @@ unp_connect2(struct socket *so, struct s break; case SOCK_STREAM: + case SOCK_SEQPACKET: unp2->unp_conn = unp; if (req == PRU_CONNECT && ((unp->unp_flags | unp2->unp_flags) & UNP_CONNWAIT)) @@ -1343,6 +1428,7 @@ unp_disconnect(struct unpcb *unp, struct break; case SOCK_STREAM: + case SOCK_SEQPACKET: soisdisconnected(unp->unp_socket); unp2->unp_conn = NULL; soisdisconnected(unp2->unp_socket); @@ -1368,7 +1454,22 @@ unp_pcblist(SYSCTL_HANDLER_ARGS) struct unp_head *head; struct xunpcb *xu; - head = ((intptr_t)arg1 == SOCK_DGRAM ? &unp_dhead : &unp_shead); + switch ((intptr_t)arg1) { + case SOCK_STREAM: + head = &unp_shead; + break; + + case SOCK_DGRAM: + head = &unp_dhead; + break; + + case SOCK_SEQPACKET: + head = &unp_sphead; + break; + + default: + panic("unp_pcblist: arg1 %d", (intptr_t)arg1); + } /* * The process of preparing the PCB list is too time-consuming and @@ -1481,6 +1582,9 @@ SYSCTL_PROC(_net_local_dgram, OID_AUTO, SYSCTL_PROC(_net_local_stream, OID_AUTO, pcblist, CTLFLAG_RD, (caddr_t)(long)SOCK_STREAM, 0, unp_pcblist, "S,xunpcb", "List of active local stream sockets"); +SYSCTL_PROC(_net_local_seqpacket, OID_AUTO, pcblist, CTLFLAG_RD, + (caddr_t)(long)SOCK_SEQPACKET, 0, unp_pcblist, "S,xunpcb", + "List of active local seqpacket sockets"); static void unp_shutdown(struct unpcb *unp) @@ -1492,7 +1596,8 @@ unp_shutdown(struct unpcb *unp) UNP_PCB_LOCK_ASSERT(unp); unp2 = unp->unp_conn; - if (unp->unp_socket->so_type == SOCK_STREAM && unp2 != NULL) { + if ((unp->unp_socket->so_type == SOCK_STREAM || + (unp->unp_socket->so_type == SOCK_SEQPACKET)) && unp2 != NULL) { so = unp2->unp_socket; if (so != NULL) socantrcvmore(so); @@ -1658,6 +1763,7 @@ unp_init(void) NULL, EVENTHANDLER_PRI_ANY); LIST_INIT(&unp_dhead); LIST_INIT(&unp_shead); + LIST_INIT(&unp_sphead); TASK_INIT(&unp_gc_task, 0, unp_gc, NULL); UNP_LINK_LOCK_INIT(); UNP_LIST_LOCK_INIT(); @@ -1974,7 +2080,8 @@ SYSCTL_INT(_net_local, OID_AUTO, taskcou static void unp_gc(__unused void *arg, int pending) { - struct unp_head *heads[] = { &unp_dhead, &unp_shead, NULL }; + struct unp_head *heads[] = { &unp_dhead, &unp_shead, &unp_sphead, + NULL }; struct unp_head **head; struct file **unref; struct unpcb *unp;
Want to link to this message? Use this URL: <https://mail-archive.FreeBSD.org/cgi/mid.cgi?200910051449.n95EnGof015525>