From owner-svn-src-head@freebsd.org Tue May 24 03:13:28 2016 Return-Path: Delivered-To: svn-src-head@mailman.ysv.freebsd.org Received: from mx1.freebsd.org (mx1.freebsd.org [IPv6:2001:1900:2254:206a::19:1]) by mailman.ysv.freebsd.org (Postfix) with ESMTP id 7B457B48721; Tue, 24 May 2016 03:13:28 +0000 (UTC) (envelope-from jhb@FreeBSD.org) Received: from repo.freebsd.org (repo.freebsd.org [IPv6:2610:1c1:1:6068::e6a:0]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (Client did not present a certificate) by mx1.freebsd.org (Postfix) with ESMTPS id 52F03167D; Tue, 24 May 2016 03:13:28 +0000 (UTC) (envelope-from jhb@FreeBSD.org) Received: from repo.freebsd.org ([127.0.1.37]) by repo.freebsd.org (8.15.2/8.15.2) with ESMTP id u4O3DRQH001599; Tue, 24 May 2016 03:13:27 GMT (envelope-from jhb@FreeBSD.org) Received: (from jhb@localhost) by repo.freebsd.org (8.15.2/8.15.2/Submit) id u4O3DRMl001597; Tue, 24 May 2016 03:13:27 GMT (envelope-from jhb@FreeBSD.org) Message-Id: <201605240313.u4O3DRMl001597@repo.freebsd.org> X-Authentication-Warning: repo.freebsd.org: jhb set sender to jhb@FreeBSD.org using -f From: John Baldwin Date: Tue, 24 May 2016 03:13:27 +0000 (UTC) To: src-committers@freebsd.org, svn-src-all@freebsd.org, svn-src-head@freebsd.org Subject: svn commit: r300556 - in head: sys/kern tests/sys/aio X-SVN-Group: head MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit X-BeenThere: svn-src-head@freebsd.org X-Mailman-Version: 2.1.22 Precedence: list List-Id: SVN commit messages for the src tree for head/-current List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Tue, 24 May 2016 03:13:28 -0000 Author: jhb Date: Tue May 24 03:13:27 2016 New Revision: 300556 URL: https://svnweb.freebsd.org/changeset/base/300556 Log: Don't prematurely return short completions on blocking sockets. Always requeue an AIO job at the head of the socket buffer's queue if sosend() or soreceive() returns EWOULDBLOCK on a blocking socket. Previously, requests were only requeued if they returned EWOULDBLOCK and completed no data. Now after a partial completion on a blocking socket the request is queued and the remaining request is retried when the socket is ready. This allows writes larger than the currently available space on a blocking socket to fully complete. Reads on a blocking socket that satifsy the low watermark can still return a short read (same as read()). In order to track previously completed data, the internal 'status' field of the AIO job is used to store the amount of previously computed data. Non-blocking sockets continue to return short completions for both reads and writes. Add a test for a "large" AIO write on a blocking socket that writes twice the socket buffer size to a UNIX domain socket. Sponsored by: Chelsio Communications Modified: head/sys/kern/sys_socket.c head/tests/sys/aio/aio_test.c Modified: head/sys/kern/sys_socket.c ============================================================================== --- head/sys/kern/sys_socket.c Tue May 24 03:08:32 2016 (r300555) +++ head/sys/kern/sys_socket.c Tue May 24 03:13:27 2016 (r300556) @@ -556,7 +556,7 @@ soaio_process_job(struct socket *so, str struct file *fp; struct uio uio; struct iovec iov; - size_t cnt; + size_t cnt, done; int error, flags; SOCKBUF_UNLOCK(sb); @@ -567,8 +567,9 @@ retry: td_savedcred = td->td_ucred; td->td_ucred = job->cred; - cnt = job->uaiocb.aio_nbytes; - iov.iov_base = (void *)(uintptr_t)job->uaiocb.aio_buf; + done = job->uaiocb._aiocb_private.status; + cnt = job->uaiocb.aio_nbytes - done; + iov.iov_base = (void *)((uintptr_t)job->uaiocb.aio_buf + done); iov.iov_len = cnt; uio.uio_iov = &iov; uio.uio_iovcnt = 1; @@ -602,42 +603,52 @@ retry: } } - cnt -= uio.uio_resid; + done += cnt - uio.uio_resid; + job->uaiocb._aiocb_private.status = done; td->td_ucred = td_savedcred; - if (cnt != 0 && (error == ERESTART || error == EINTR || - error == EWOULDBLOCK)) - error = 0; if (error == EWOULDBLOCK) { /* - * A read() or write() on the socket raced with this - * request. If the socket is now ready, try again. - * If it is not, place this request at the head of the + * The request was either partially completed or not + * completed at all due to racing with a read() or + * write() on the socket. If the socket is + * non-blocking, return with any partial completion. + * If the socket is blocking or if no progress has + * been made, requeue this request at the head of the * queue to try again when the socket is ready. */ - SOCKBUF_LOCK(sb); - empty_results++; - if (soaio_ready(so, sb)) { - empty_retries++; - SOCKBUF_UNLOCK(sb); - goto retry; - } - - if (!aio_set_cancel_function(job, soo_aio_cancel)) { - MPASS(cnt == 0); - SOCKBUF_UNLOCK(sb); - aio_cancel(job); - SOCKBUF_LOCK(sb); - } else { - TAILQ_INSERT_HEAD(&sb->sb_aiojobq, job, list); - } - } else { - if (error) - aio_complete(job, -1, error); - else - aio_complete(job, cnt, 0); + MPASS(done != job->uaiocb.aio_nbytes); SOCKBUF_LOCK(sb); - } + if (done == 0 || !(so->so_state & SS_NBIO)) { + empty_results++; + if (soaio_ready(so, sb)) { + empty_retries++; + SOCKBUF_UNLOCK(sb); + goto retry; + } + + if (!aio_set_cancel_function(job, soo_aio_cancel)) { + SOCKBUF_UNLOCK(sb); + if (done != 0) + aio_complete(job, done, 0); + else + aio_cancel(job); + SOCKBUF_LOCK(sb); + } else { + TAILQ_INSERT_HEAD(&sb->sb_aiojobq, job, list); + } + return; + } + SOCKBUF_UNLOCK(sb); + } + if (done != 0 && (error == ERESTART || error == EINTR || + error == EWOULDBLOCK)) + error = 0; + if (error) + aio_complete(job, -1, error); + else + aio_complete(job, done, 0); + SOCKBUF_LOCK(sb); } static void @@ -758,6 +769,7 @@ soo_aio_queue(struct file *fp, struct ka if (!aio_set_cancel_function(job, soo_aio_cancel)) panic("new job was cancelled"); TAILQ_INSERT_TAIL(&sb->sb_aiojobq, job, list); + job->uaiocb._aiocb_private.status = 0; if (!(sb->sb_flags & SB_AIO_RUNNING)) { if (soaio_ready(so, sb)) sowakeup_aio(so, sb); Modified: head/tests/sys/aio/aio_test.c ============================================================================== --- head/tests/sys/aio/aio_test.c Tue May 24 03:08:32 2016 (r300555) +++ head/tests/sys/aio/aio_test.c Tue May 24 03:13:27 2016 (r300556) @@ -781,6 +781,70 @@ ATF_TC_BODY(aio_socket_two_reads, tc) close(s[0]); } +/* + * This test ensures that aio_write() on a blocking socket of a "large" + * buffer does not return a short completion. + */ +ATF_TC_WITHOUT_HEAD(aio_socket_blocking_short_write); +ATF_TC_BODY(aio_socket_blocking_short_write, tc) +{ + struct aiocb iocb, *iocbp; + char *buffer[2]; + ssize_t done; + int buffer_size, sb_size; + socklen_t len; + int s[2]; + + ATF_REQUIRE_KERNEL_MODULE("aio"); + + ATF_REQUIRE(socketpair(PF_UNIX, SOCK_STREAM, 0, s) != -1); + + len = sizeof(sb_size); + ATF_REQUIRE(getsockopt(s[0], SOL_SOCKET, SO_RCVBUF, &sb_size, &len) != + -1); + ATF_REQUIRE(len == sizeof(sb_size)); + buffer_size = sb_size; + + ATF_REQUIRE(getsockopt(s[1], SOL_SOCKET, SO_SNDBUF, &sb_size, &len) != + -1); + ATF_REQUIRE(len == sizeof(sb_size)); + if (sb_size > buffer_size) + buffer_size = sb_size; + + /* + * Use twice the size of the MAX(receive buffer, send buffer) + * to ensure that the write is split up into multiple writes + * internally. + */ + buffer_size *= 2; + + buffer[0] = malloc(buffer_size); + ATF_REQUIRE(buffer[0] != NULL); + buffer[1] = malloc(buffer_size); + ATF_REQUIRE(buffer[1] != NULL); + + srandomdev(); + aio_fill_buffer(buffer[1], buffer_size, random()); + + memset(&iocb, 0, sizeof(iocb)); + iocb.aio_fildes = s[1]; + iocb.aio_buf = buffer[1]; + iocb.aio_nbytes = buffer_size; + ATF_REQUIRE(aio_write(&iocb) == 0); + + done = recv(s[0], buffer[0], buffer_size, MSG_WAITALL); + ATF_REQUIRE(done == buffer_size); + + done = aio_waitcomplete(&iocbp, NULL); + ATF_REQUIRE(iocbp == &iocb); + ATF_REQUIRE(done == buffer_size); + + ATF_REQUIRE(memcmp(buffer[0], buffer[1], buffer_size) == 0); + + close(s[1]); + close(s[0]); +} + ATF_TP_ADD_TCS(tp) { @@ -792,6 +856,7 @@ ATF_TP_ADD_TCS(tp) ATF_TP_ADD_TC(tp, aio_md_test); ATF_TP_ADD_TC(tp, aio_large_read_test); ATF_TP_ADD_TC(tp, aio_socket_two_reads); + ATF_TP_ADD_TC(tp, aio_socket_blocking_short_write); return (atf_no_error()); }