Date: Wed, 22 Jan 2014 23:46:19 +0000 (UTC) From: Alexander Motin <mav@FreeBSD.org> To: src-committers@freebsd.org, svn-src-all@freebsd.org, svn-src-stable@freebsd.org, svn-src-stable-10@freebsd.org Subject: svn commit: r261047 - stable/10/sys/rpc Message-ID: <201401222346.s0MNkJFF025307@svn.freebsd.org>
next in thread | raw e-mail | index | archive | help
Author: mav Date: Wed Jan 22 23:46:19 2014 New Revision: 261047 URL: http://svnweb.freebsd.org/changeset/base/261047 Log: MFC r259632: Rework flow control for connection-oriented (TCP) RPC server. When processing receive buffer, write the amount of data, expected in present request record, into socket's so_rcv.sb_lowat to make stack aware about our needs. When processing following upcalls, ignore them until socket collect enough data to be read and processed in one turn. This change reduces number of context switches and other operations in RPC stack during large NFS writes (especially via non-Jumbo networks) by order of magnitude. After precessing current packet, take another look into the pending buffer to find out whether the next packet had been already received. If not, deactivate this port right there without making RPC code to push this port to another thread just to find that there is nothing. If the next packet is received partially, also deactivate the port, but also update socket's so_rcv.sb_lowat to not be woken up prematurely. This change additionally reduces number of context switches per NFS request about in half. Modified: stable/10/sys/rpc/svc_vc.c Directory Properties: stable/10/ (props changed) Modified: stable/10/sys/rpc/svc_vc.c ============================================================================== --- stable/10/sys/rpc/svc_vc.c Wed Jan 22 23:45:27 2014 (r261046) +++ stable/10/sys/rpc/svc_vc.c Wed Jan 22 23:46:19 2014 (r261047) @@ -381,15 +381,11 @@ svc_vc_rendezvous_recv(SVCXPRT *xprt, st * We must re-test for new connections after taking * the lock to protect us in the case where a new * connection arrives after our call to accept fails - * with EWOULDBLOCK. The pool lock protects us from - * racing the upcall after our TAILQ_EMPTY() call - * returns false. + * with EWOULDBLOCK. */ ACCEPT_LOCK(); - mtx_lock(&xprt->xp_pool->sp_lock); if (TAILQ_EMPTY(&xprt->xp_socket->so_comp)) - xprt_inactive_locked(xprt); - mtx_unlock(&xprt->xp_pool->sp_lock); + xprt_inactive(xprt); ACCEPT_UNLOCK(); sx_xunlock(&xprt->xp_lock); return (FALSE); @@ -526,35 +522,14 @@ static enum xprt_stat svc_vc_stat(SVCXPRT *xprt) { struct cf_conn *cd; - struct mbuf *m; - size_t n; cd = (struct cf_conn *)(xprt->xp_p1); if (cd->strm_stat == XPRT_DIED) return (XPRT_DIED); - /* - * Return XPRT_MOREREQS if we have buffered data and we are - * mid-record or if we have enough data for a record - * marker. Since this is only a hint, we read mpending and - * resid outside the lock. We do need to take the lock if we - * have to traverse the mbuf chain. - */ - if (cd->mpending) { - if (cd->resid) - return (XPRT_MOREREQS); - n = 0; - sx_xlock(&xprt->xp_lock); - m = cd->mpending; - while (m && n < sizeof(uint32_t)) { - n += m->m_len; - m = m->m_next; - } - sx_xunlock(&xprt->xp_lock); - if (n >= sizeof(uint32_t)) - return (XPRT_MOREREQS); - } + if (cd->mreq != NULL && cd->resid == 0 && cd->eor) + return (XPRT_MOREREQS); if (soreadable(xprt->xp_socket)) return (XPRT_MOREREQS); @@ -575,6 +550,78 @@ svc_vc_backchannel_stat(SVCXPRT *xprt) return (XPRT_IDLE); } +/* + * If we have an mbuf chain in cd->mpending, try to parse a record from it, + * leaving the result in cd->mreq. If we don't have a complete record, leave + * the partial result in cd->mreq and try to read more from the socket. + */ +static void +svc_vc_process_pending(SVCXPRT *xprt) +{ + struct cf_conn *cd = (struct cf_conn *) xprt->xp_p1; + struct socket *so = xprt->xp_socket; + struct mbuf *m; + + /* + * If cd->resid is non-zero, we have part of the + * record already, otherwise we are expecting a record + * marker. + */ + if (!cd->resid && cd->mpending) { + /* + * See if there is enough data buffered to + * make up a record marker. Make sure we can + * handle the case where the record marker is + * split across more than one mbuf. + */ + size_t n = 0; + uint32_t header; + + m = cd->mpending; + while (n < sizeof(uint32_t) && m) { + n += m->m_len; + m = m->m_next; + } + if (n < sizeof(uint32_t)) { + so->so_rcv.sb_lowat = sizeof(uint32_t) - n; + return; + } + m_copydata(cd->mpending, 0, sizeof(header), + (char *)&header); + header = ntohl(header); + cd->eor = (header & 0x80000000) != 0; + cd->resid = header & 0x7fffffff; + m_adj(cd->mpending, sizeof(uint32_t)); + } + + /* + * Start pulling off mbufs from cd->mpending + * until we either have a complete record or + * we run out of data. We use m_split to pull + * data - it will pull as much as possible and + * split the last mbuf if necessary. + */ + while (cd->mpending && cd->resid) { + m = cd->mpending; + if (cd->mpending->m_next + || cd->mpending->m_len > cd->resid) + cd->mpending = m_split(cd->mpending, + cd->resid, M_WAITOK); + else + cd->mpending = NULL; + if (cd->mreq) + m_last(cd->mreq)->m_next = m; + else + cd->mreq = m; + while (m) { + cd->resid -= m->m_len; + m = m->m_next; + } + } + + so->so_rcv.sb_lowat = imax(1, imin(cd->resid, so->so_rcv.sb_hiwat / 2)); +} + static bool_t svc_vc_recv(SVCXPRT *xprt, struct rpc_msg *msg, struct sockaddr **addrp, struct mbuf **mp) @@ -582,6 +629,7 @@ svc_vc_recv(SVCXPRT *xprt, struct rpc_ms struct cf_conn *cd = (struct cf_conn *) xprt->xp_p1; struct uio uio; struct mbuf *m; + struct socket* so = xprt->xp_socket; XDR xdrs; int error, rcvflag; @@ -592,99 +640,40 @@ svc_vc_recv(SVCXPRT *xprt, struct rpc_ms sx_xlock(&xprt->xp_lock); for (;;) { - /* - * If we have an mbuf chain in cd->mpending, try to parse a - * record from it, leaving the result in cd->mreq. If we don't - * have a complete record, leave the partial result in - * cd->mreq and try to read more from the socket. - */ - if (cd->mpending) { - /* - * If cd->resid is non-zero, we have part of the - * record already, otherwise we are expecting a record - * marker. - */ - if (!cd->resid) { - /* - * See if there is enough data buffered to - * make up a record marker. Make sure we can - * handle the case where the record marker is - * split across more than one mbuf. - */ - size_t n = 0; - uint32_t header; - - m = cd->mpending; - while (n < sizeof(uint32_t) && m) { - n += m->m_len; - m = m->m_next; - } - if (n < sizeof(uint32_t)) - goto readmore; - m_copydata(cd->mpending, 0, sizeof(header), - (char *)&header); - header = ntohl(header); - cd->eor = (header & 0x80000000) != 0; - cd->resid = header & 0x7fffffff; - m_adj(cd->mpending, sizeof(uint32_t)); - } - - /* - * Start pulling off mbufs from cd->mpending - * until we either have a complete record or - * we run out of data. We use m_split to pull - * data - it will pull as much as possible and - * split the last mbuf if necessary. - */ - while (cd->mpending && cd->resid) { - m = cd->mpending; - if (cd->mpending->m_next - || cd->mpending->m_len > cd->resid) - cd->mpending = m_split(cd->mpending, - cd->resid, M_WAITOK); - else - cd->mpending = NULL; - if (cd->mreq) - m_last(cd->mreq)->m_next = m; - else - cd->mreq = m; - while (m) { - cd->resid -= m->m_len; - m = m->m_next; - } + /* If we have no request ready, check pending queue. */ + while (cd->mpending && + (cd->mreq == NULL || cd->resid != 0 || !cd->eor)) + svc_vc_process_pending(xprt); + + /* Process and return complete request in cd->mreq. */ + if (cd->mreq != NULL && cd->resid == 0 && cd->eor) { + + xdrmbuf_create(&xdrs, cd->mreq, XDR_DECODE); + cd->mreq = NULL; + + /* Check for next request in a pending queue. */ + svc_vc_process_pending(xprt); + if (cd->mreq == NULL || cd->resid != 0) { + SOCKBUF_LOCK(&so->so_rcv); + if (!soreadable(so)) + xprt_inactive(xprt); + SOCKBUF_UNLOCK(&so->so_rcv); } - /* - * If cd->resid is zero now, we have managed to - * receive a record fragment from the stream. Check - * for the end-of-record mark to see if we need more. - */ - if (cd->resid == 0) { - if (!cd->eor) - continue; - - /* - * Success - we have a complete record in - * cd->mreq. - */ - xdrmbuf_create(&xdrs, cd->mreq, XDR_DECODE); - cd->mreq = NULL; - sx_xunlock(&xprt->xp_lock); - - if (! xdr_callmsg(&xdrs, msg)) { - XDR_DESTROY(&xdrs); - return (FALSE); - } + sx_xunlock(&xprt->xp_lock); - *addrp = NULL; - *mp = xdrmbuf_getall(&xdrs); + if (! xdr_callmsg(&xdrs, msg)) { XDR_DESTROY(&xdrs); - - return (TRUE); + return (FALSE); } + + *addrp = NULL; + *mp = xdrmbuf_getall(&xdrs); + XDR_DESTROY(&xdrs); + + return (TRUE); } - readmore: /* * The socket upcall calls xprt_active() which will eventually * cause the server to call us here. We attempt to @@ -697,8 +686,7 @@ svc_vc_recv(SVCXPRT *xprt, struct rpc_ms uio.uio_td = curthread; m = NULL; rcvflag = MSG_DONTWAIT; - error = soreceive(xprt->xp_socket, NULL, &uio, &m, NULL, - &rcvflag); + error = soreceive(so, NULL, &uio, &m, NULL, &rcvflag); if (error == EWOULDBLOCK) { /* @@ -706,25 +694,23 @@ svc_vc_recv(SVCXPRT *xprt, struct rpc_ms * taking the lock to protect us in the case * where a new packet arrives on the socket * after our call to soreceive fails with - * EWOULDBLOCK. The pool lock protects us from - * racing the upcall after our soreadable() - * call returns false. + * EWOULDBLOCK. */ - mtx_lock(&xprt->xp_pool->sp_lock); - if (!soreadable(xprt->xp_socket)) - xprt_inactive_locked(xprt); - mtx_unlock(&xprt->xp_pool->sp_lock); + SOCKBUF_LOCK(&so->so_rcv); + if (!soreadable(so)) + xprt_inactive(xprt); + SOCKBUF_UNLOCK(&so->so_rcv); sx_xunlock(&xprt->xp_lock); return (FALSE); } if (error) { - SOCKBUF_LOCK(&xprt->xp_socket->so_rcv); + SOCKBUF_LOCK(&so->so_rcv); if (xprt->xp_upcallset) { xprt->xp_upcallset = 0; - soupcall_clear(xprt->xp_socket, SO_RCV); + soupcall_clear(so, SO_RCV); } - SOCKBUF_UNLOCK(&xprt->xp_socket->so_rcv); + SOCKBUF_UNLOCK(&so->so_rcv); xprt_inactive(xprt); cd->strm_stat = XPRT_DIED; sx_xunlock(&xprt->xp_lock); @@ -908,7 +894,8 @@ svc_vc_soupcall(struct socket *so, void { SVCXPRT *xprt = (SVCXPRT *) arg; - xprt_active(xprt); + if (soreadable(xprt->xp_socket)) + xprt_active(xprt); return (SU_OK); }
Want to link to this message? Use this URL: <https://mail-archive.FreeBSD.org/cgi/mid.cgi?201401222346.s0MNkJFF025307>