From owner-svn-src-head@freebsd.org Sun Jun 21 00:06:04 2020 Return-Path: Delivered-To: svn-src-head@mailman.nyi.freebsd.org Received: from mx1.freebsd.org (mx1.freebsd.org [IPv6:2610:1c1:1:606c::19:1]) by mailman.nyi.freebsd.org (Postfix) with ESMTP id E6C58338E00; Sun, 21 Jun 2020 00:06:04 +0000 (UTC) (envelope-from rmacklem@FreeBSD.org) Received: from mxrelay.nyi.freebsd.org (mxrelay.nyi.freebsd.org [IPv6:2610:1c1:1:606c::19:3]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (4096 bits) server-digest SHA256 client-signature RSA-PSS (4096 bits) client-digest SHA256) (Client CN "mxrelay.nyi.freebsd.org", Issuer "Let's Encrypt Authority X3" (verified OK)) by mx1.freebsd.org (Postfix) with ESMTPS id 49qCW05mHXz46mQ; Sun, 21 Jun 2020 00:06:04 +0000 (UTC) (envelope-from rmacklem@FreeBSD.org) Received: from repo.freebsd.org (repo.freebsd.org [IPv6:2610:1c1:1:6068::e6a:0]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (Client did not present a certificate) by mxrelay.nyi.freebsd.org (Postfix) with ESMTPS id C0ED21498F; Sun, 21 Jun 2020 00:06:04 +0000 (UTC) (envelope-from rmacklem@FreeBSD.org) Received: from repo.freebsd.org ([127.0.1.37]) by repo.freebsd.org (8.15.2/8.15.2) with ESMTP id 05L064Eu064098; Sun, 21 Jun 2020 00:06:04 GMT (envelope-from rmacklem@FreeBSD.org) Received: (from rmacklem@localhost) by repo.freebsd.org (8.15.2/8.15.2/Submit) id 05L064Cf064097; Sun, 21 Jun 2020 00:06:04 GMT (envelope-from rmacklem@FreeBSD.org) Message-Id: <202006210006.05L064Cf064097@repo.freebsd.org> X-Authentication-Warning: repo.freebsd.org: rmacklem set sender to rmacklem@FreeBSD.org using -f From: Rick Macklem Date: Sun, 21 Jun 2020 00:06:04 +0000 (UTC) To: src-committers@freebsd.org, svn-src-all@freebsd.org, svn-src-head@freebsd.org Subject: svn commit: r362455 - head/sys/rpc X-SVN-Group: head X-SVN-Commit-Author: rmacklem X-SVN-Commit-Paths: head/sys/rpc X-SVN-Commit-Revision: 362455 X-SVN-Commit-Repository: base MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit X-BeenThere: svn-src-head@freebsd.org X-Mailman-Version: 2.1.33 Precedence: list List-Id: SVN commit messages for the src tree for head/-current List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Sun, 21 Jun 2020 00:06:05 -0000 Author: rmacklem Date: Sun Jun 21 00:06:04 2020 New Revision: 362455 URL: https://svnweb.freebsd.org/changeset/base/362455 Log: Modify the way the client side krpc does soreceive() for TCP. Without this patch, clnt_vc_soupcall() first does a soreceive() for 4 bytes (the Sun RPC over TCP record mark) and then soreceive(s) for the RPC message. This first soreceive() almost always results in an mbuf allocation, since having the 4byte record mark in a separate mbuf in the socket rcv queue is unlikely. This is somewhat inefficient and rather odd. It also will not work for the ktls rx, since the latter returns a TLS record for each soreceive(). This patch replaces the above with code similar to what the server side of the krpc does for TCP, where it does a soreceive() for as much data as possible and then parses RPC messages out of the received data. A new field of the TCP socket structure called ct_raw is the list of received mbufs that the RPC message(s) are parsed from. I think this results in cleaner code and is needed for support of nfs-over-tls. It also fixes the code for the case where a server sends an RPC message in multiple RPC message fragments. Although this is allowed by RFC5531, no extant NFS server does this. However, it is probably good to fix this in case some future NFS server does do this. Modified: head/sys/rpc/clnt_vc.c head/sys/rpc/krpc.h Modified: head/sys/rpc/clnt_vc.c ============================================================================== --- head/sys/rpc/clnt_vc.c Sat Jun 20 23:48:57 2020 (r362454) +++ head/sys/rpc/clnt_vc.c Sun Jun 21 00:06:04 2020 (r362455) @@ -269,6 +269,7 @@ clnt_vc_create( soupcall_set(ct->ct_socket, SO_RCV, clnt_vc_soupcall, ct); SOCKBUF_UNLOCK(&ct->ct_socket->so_rcv); + ct->ct_raw = NULL; ct->ct_record = NULL; ct->ct_record_resid = 0; TAILQ_INIT(&ct->ct_pending); @@ -826,6 +827,8 @@ clnt_vc_destroy(CLIENT *cl) soshutdown(so, SHUT_WR); soclose(so); } + m_freem(ct->ct_record); + m_freem(ct->ct_raw); mem_free(ct, sizeof(struct ct_data)); if (cl->cl_netid && cl->cl_netid[0]) mem_free(cl->cl_netid, strlen(cl->cl_netid) +1); @@ -854,122 +857,118 @@ clnt_vc_soupcall(struct socket *so, void *arg, int wai struct ct_request *cr; int error, rcvflag, foundreq; uint32_t xid_plus_direction[2], header; - bool_t do_read; SVCXPRT *xprt; struct cf_conn *cd; + u_int rawlen; - CTASSERT(sizeof(xid_plus_direction) == 2 * sizeof(uint32_t)); + /* + * If another thread is already here, it must be in + * soreceive(), so just return to avoid races with it. + * ct_upcallrefs is protected by the SOCKBUF_LOCK(), + * which is held in this function, except when + * soreceive() is called. + */ + if (ct->ct_upcallrefs > 0) + return (SU_OK); ct->ct_upcallrefs++; - uio.uio_td = curthread; - do { - /* - * If ct_record_resid is zero, we are waiting for a - * record mark. - */ - if (ct->ct_record_resid == 0) { + /* + * Read as much as possible off the socket and link it + * onto ct_raw. + */ + for (;;) { + uio.uio_resid = 1000000000; + uio.uio_td = curthread; + m2 = m = NULL; + rcvflag = MSG_DONTWAIT | MSG_SOCALLBCK; + SOCKBUF_UNLOCK(&so->so_rcv); + error = soreceive(so, NULL, &uio, &m, NULL, &rcvflag); + SOCKBUF_LOCK(&so->so_rcv); + + if (error == EWOULDBLOCK) { /* - * Make sure there is either a whole record - * mark in the buffer or there is some other - * error condition + * We must re-test for readability after + * taking the lock to protect us in the case + * where a new packet arrives on the socket + * after our call to soreceive fails with + * EWOULDBLOCK. */ - do_read = FALSE; - if (sbavail(&so->so_rcv) >= sizeof(uint32_t) - || (so->so_rcv.sb_state & SBS_CANTRCVMORE) - || so->so_error) - do_read = TRUE; - - if (!do_read) + error = 0; + if (!soreadable(so)) break; + continue; + } + if (error == 0 && m == NULL) { + /* + * We must have got EOF trying + * to read from the stream. + */ + error = ECONNRESET; + } + if (error != 0) + break; - SOCKBUF_UNLOCK(&so->so_rcv); - uio.uio_resid = sizeof(uint32_t); - m = NULL; - rcvflag = MSG_DONTWAIT | MSG_SOCALLBCK; - error = soreceive(so, NULL, &uio, &m, NULL, &rcvflag); - SOCKBUF_LOCK(&so->so_rcv); + if (ct->ct_raw != NULL) + m_last(ct->ct_raw)->m_next = m; + else + ct->ct_raw = m; + } + rawlen = m_length(ct->ct_raw, NULL); - if (error == EWOULDBLOCK) + /* Now, process as much of ct_raw as possible. */ + for (;;) { + /* + * If ct_record_resid is zero, we are waiting for a + * record mark. + */ + if (ct->ct_record_resid == 0) { + if (rawlen < sizeof(uint32_t)) break; - - /* - * If there was an error, wake up all pending - * requests. - */ - if (error || uio.uio_resid > 0) { - wakeup_all: - mtx_lock(&ct->ct_lock); - if (!error) { - /* - * We must have got EOF trying - * to read from the stream. - */ - error = ECONNRESET; - } - ct->ct_error.re_status = RPC_CANTRECV; - ct->ct_error.re_errno = error; - TAILQ_FOREACH(cr, &ct->ct_pending, cr_link) { - cr->cr_error = error; - wakeup(cr); - } - mtx_unlock(&ct->ct_lock); - break; - } - m_copydata(m, 0, sizeof(uint32_t), (char *)&header); + m_copydata(ct->ct_raw, 0, sizeof(uint32_t), + (char *)&header); header = ntohl(header); - ct->ct_record = NULL; ct->ct_record_resid = header & 0x7fffffff; ct->ct_record_eor = ((header & 0x80000000) != 0); - m_freem(m); + m_adj(ct->ct_raw, sizeof(uint32_t)); + rawlen -= sizeof(uint32_t); } else { /* - * Wait until the socket has the whole record - * buffered. + * Move as much of the record as possible to + * ct_record. */ - do_read = FALSE; - if (sbavail(&so->so_rcv) >= ct->ct_record_resid - || (so->so_rcv.sb_state & SBS_CANTRCVMORE) - || so->so_error) - do_read = TRUE; - - if (!do_read) + if (rawlen == 0) break; - - /* - * We have the record mark. Read as much as - * the socket has buffered up to the end of - * this record. - */ - SOCKBUF_UNLOCK(&so->so_rcv); - uio.uio_resid = ct->ct_record_resid; - m = NULL; - rcvflag = MSG_DONTWAIT | MSG_SOCALLBCK; - error = soreceive(so, NULL, &uio, &m, NULL, &rcvflag); - SOCKBUF_LOCK(&so->so_rcv); - - if (error == EWOULDBLOCK) + if (rawlen <= ct->ct_record_resid) { + if (ct->ct_record != NULL) + m_last(ct->ct_record)->m_next = + ct->ct_raw; + else + ct->ct_record = ct->ct_raw; + ct->ct_raw = NULL; + ct->ct_record_resid -= rawlen; + rawlen = 0; + } else { + m = m_split(ct->ct_raw, ct->ct_record_resid, + M_NOWAIT); + if (m == NULL) + break; + if (ct->ct_record != NULL) + m_last(ct->ct_record)->m_next = + ct->ct_raw; + else + ct->ct_record = ct->ct_raw; + rawlen -= ct->ct_record_resid; + ct->ct_record_resid = 0; + ct->ct_raw = m; + } + if (ct->ct_record_resid > 0) break; - if (error || uio.uio_resid == ct->ct_record_resid) - goto wakeup_all; - /* - * If we have part of the record already, - * chain this bit onto the end. - */ - if (ct->ct_record) - m_last(ct->ct_record)->m_next = m; - else - ct->ct_record = m; - - ct->ct_record_resid = uio.uio_resid; - - /* * If we have the entire record, see if we can * match it to a request. */ - if (ct->ct_record_resid == 0 - && ct->ct_record_eor) { + if (ct->ct_record_eor) { /* * The XID is in the first uint32_t of * the reply and the message direction @@ -979,8 +978,20 @@ clnt_vc_soupcall(struct socket *so, void *arg, int wai sizeof(xid_plus_direction) && m_length(ct->ct_record, NULL) < sizeof(xid_plus_direction)) { - m_freem(ct->ct_record); - break; + /* + * What to do now? + * The data in the TCP stream is + * corrupted such that there is no + * valid RPC message to parse. + * I think it best to close this + * connection and allow + * clnt_reconnect_XXX() to try + * and establish a new one. + */ + printf("clnt_vc_soupcall: " + "connection data corrupted\n"); + error = ECONNRESET; + goto wakeup_all; } m_copydata(ct->ct_record, 0, sizeof(xid_plus_direction), @@ -1057,7 +1068,26 @@ clnt_vc_soupcall(struct socket *so, void *arg, int wai } } } - } while (m); + } + + if (error != 0) { + wakeup_all: + /* + * This socket is broken, so mark that it cannot + * receive and fail all RPCs waiting for a reply + * on it, so that they will be retried on a new + * TCP connection created by clnt_reconnect_X(). + */ + mtx_lock(&ct->ct_lock); + ct->ct_error.re_status = RPC_CANTRECV; + ct->ct_error.re_errno = error; + TAILQ_FOREACH(cr, &ct->ct_pending, cr_link) { + cr->cr_error = error; + wakeup(cr); + } + mtx_unlock(&ct->ct_lock); + } + ct->ct_upcallrefs--; if (ct->ct_upcallrefs < 0) panic("rpcvc upcall refcnt"); Modified: head/sys/rpc/krpc.h ============================================================================== --- head/sys/rpc/krpc.h Sat Jun 20 23:48:57 2020 (r362454) +++ head/sys/rpc/krpc.h Sun Jun 21 00:06:04 2020 (r362455) @@ -101,6 +101,7 @@ struct ct_data { struct ct_request_list ct_pending; int ct_upcallrefs; /* Ref cnt of upcalls in prog. */ SVCXPRT *ct_backchannelxprt; /* xprt for backchannel */ + struct mbuf *ct_raw; /* Raw mbufs recv'd */ }; struct cf_conn { /* kept in xprt->xp_p1 for actual connection */