Date: Tue, 15 Dec 2020 06:31:46 +0000 (UTC) From: Alan Somers <asomers@FreeBSD.org> To: src-committers@freebsd.org, svn-src-projects@freebsd.org Subject: svn commit: r368653 - in projects/aio_writev: sys/kern tests/sys/aio Message-ID: <202012150631.0BF6VkZN072836@repo.freebsd.org>
next in thread | raw e-mail | index | archive | help
Author: asomers Date: Tue Dec 15 06:31:46 2020 New Revision: 368653 URL: https://svnweb.freebsd.org/changeset/base/368653 Log: aio_writev and aio_readv work on sockets now Modified: projects/aio_writev/sys/kern/sys_socket.c projects/aio_writev/sys/kern/vfs_aio.c projects/aio_writev/tests/sys/aio/aio_test.c Modified: projects/aio_writev/sys/kern/sys_socket.c ============================================================================== --- projects/aio_writev/sys/kern/sys_socket.c Tue Dec 15 01:45:19 2020 (r368652) +++ projects/aio_writev/sys/kern/sys_socket.c Tue Dec 15 06:31:46 2020 (r368653) @@ -459,7 +459,26 @@ static int soaio_lifetime; SYSCTL_INT(_kern_ipc_aio, OID_AUTO, lifetime, CTLFLAG_RW, &soaio_lifetime, 0, "Maximum lifetime for idle aiod"); +/* Advance the cursor in a uio by n bytes */ static void +soaio_advance_uio(struct uio *uio, int n) +{ + while (n > 0) { + int m = MIN(n, uio->uio_iov[0].iov_len); + n -= m; + uio->uio_iov[0].iov_base = (char*)uio->uio_iov[0].iov_base + m; + uio->uio_iov[0].iov_len -= m; + MPASS(uio->uio_resid >= m); + uio->uio_resid -= m; + if (uio->uio_iov[0].iov_len == 0) { + MPASS(uio->uio_iovcnt >= 1); + uio->uio_iov++; + uio->uio_iovcnt--; + } + } +} + +static void soaio_kproc_loop(void *arg) { struct proc *p; @@ -600,30 +619,48 @@ soaio_process_job(struct socket *so, struct sockbuf *s struct ucred *td_savedcred; struct thread *td; struct file *fp; - struct uio uio; + struct uio uio, *auiop; struct iovec iov; - size_t cnt, done; + size_t cnt, done, job_total_nbytes; long ru_before; - int error, flags; + int error, flags, opcode; + bool vectored; SOCKBUF_UNLOCK(sb); aio_switch_vmspace(job); td = curthread; fp = job->fd_file; + opcode = job->uaiocb.aio_lio_opcode; + vectored = opcode == LIO_WRITEV || opcode == LIO_READV; retry: td_savedcred = td->td_ucred; - td->td_ucred = job->cred; + if (vectored) { + error = copyinuio(job->uaiocb.aio_iov, job->uaiocb.aio_iovcnt, + &auiop); + if (error) { + aio_complete(job, -1, error); + SOCKBUF_LOCK(sb); + return; + } + } else { + iov.iov_base = (void *)((uintptr_t)job->uaiocb.aio_buf); + uio.uio_resid = job->uaiocb.aio_nbytes; + iov.iov_len = uio.uio_resid; + uio.uio_iov = &iov; + uio.uio_iovcnt = 1; + uio.uio_segflg = UIO_USERSPACE; + auiop = &uio; + } + job_total_nbytes = auiop->uio_resid; + auiop->uio_offset = 0; + auiop->uio_td = td; + + td->td_ucred = job->cred; done = job->aio_done; - cnt = job->uaiocb.aio_nbytes - done; - iov.iov_base = (void *)((uintptr_t)job->uaiocb.aio_buf + done); - iov.iov_len = cnt; - uio.uio_iov = &iov; - uio.uio_iovcnt = 1; - uio.uio_offset = 0; - uio.uio_resid = cnt; - uio.uio_segflg = UIO_USERSPACE; - uio.uio_td = td; + soaio_advance_uio(auiop, done); + cnt = auiop->uio_resid; + MPASS(cnt == job_total_nbytes - done); flags = MSG_NBIO; /* @@ -633,26 +670,26 @@ retry: */ if (sb == &so->so_rcv) { - uio.uio_rw = UIO_READ; + auiop->uio_rw = UIO_READ; ru_before = td->td_ru.ru_msgrcv; #ifdef MAC error = mac_socket_check_receive(fp->f_cred, so); if (error == 0) #endif - error = soreceive(so, NULL, &uio, NULL, NULL, &flags); + error = soreceive(so, NULL, auiop, NULL, NULL, &flags); if (td->td_ru.ru_msgrcv != ru_before) job->msgrcv = 1; } else { if (!TAILQ_EMPTY(&sb->sb_aiojobq)) flags |= MSG_MORETOCOME; - uio.uio_rw = UIO_WRITE; + auiop->uio_rw = UIO_WRITE; ru_before = td->td_ru.ru_msgsnd; #ifdef MAC error = mac_socket_check_send(fp->f_cred, so); if (error == 0) #endif - error = sosend(so, NULL, &uio, NULL, NULL, flags, td); + error = sosend(so, NULL, auiop, NULL, NULL, flags, td); if (td->td_ru.ru_msgsnd != ru_before) job->msgsnd = 1; if (error == EPIPE && (so->so_options & SO_NOSIGPIPE) == 0) { @@ -662,7 +699,7 @@ retry: } } - done += cnt - uio.uio_resid; + done += cnt - auiop->uio_resid; job->aio_done = done; td->td_ucred = td_savedcred; @@ -676,7 +713,7 @@ retry: * been made, requeue this request at the head of the * queue to try again when the socket is ready. */ - MPASS(done != job->uaiocb.aio_nbytes); + MPASS(done != job_total_nbytes); SOCKBUF_LOCK(sb); if (done == 0 || !(so->so_state & SS_NBIO)) { empty_results++; @@ -696,7 +733,7 @@ retry: } else { TAILQ_INSERT_HEAD(&sb->sb_aiojobq, job, list); } - return; + goto out; } SOCKBUF_UNLOCK(sb); } @@ -708,6 +745,10 @@ retry: else aio_complete(job, done, 0); SOCKBUF_LOCK(sb); + +out: + if (vectored) + free(auiop, M_IOV); } static void @@ -782,10 +823,10 @@ soo_aio_cancel(struct kaiocb *job) so = job->fd_file->f_data; opcode = job->uaiocb.aio_lio_opcode; - if (opcode == LIO_READ) + if (opcode == LIO_READ || opcode == LIO_READV) sb = &so->so_rcv; else { - MPASS(opcode == LIO_WRITE); + MPASS(opcode == LIO_WRITE || opcode == LIO_WRITEV); sb = &so->so_snd; } @@ -817,9 +858,11 @@ soo_aio_queue(struct file *fp, struct kaiocb *job) switch (job->uaiocb.aio_lio_opcode) { case LIO_READ: + case LIO_READV: sb = &so->so_rcv; break; case LIO_WRITE: + case LIO_WRITEV: sb = &so->so_snd; break; default: Modified: projects/aio_writev/sys/kern/vfs_aio.c ============================================================================== --- projects/aio_writev/sys/kern/vfs_aio.c Tue Dec 15 01:45:19 2020 (r368652) +++ projects/aio_writev/sys/kern/vfs_aio.c Tue Dec 15 06:31:46 2020 (r368653) @@ -1695,8 +1695,6 @@ no_kqueue: error = 0; } else if (fp->f_ops->fo_aio_queue == NULL) error = aio_queue_file(fp, job); - else if (opcode == LIO_WRITEV || opcode == LIO_READV) - error = EOPNOTSUPP; else error = fo_aio_queue(fp, job); if (error) Modified: projects/aio_writev/tests/sys/aio/aio_test.c ============================================================================== --- projects/aio_writev/tests/sys/aio/aio_test.c Tue Dec 15 01:45:19 2020 (r368652) +++ projects/aio_writev/tests/sys/aio/aio_test.c Tue Dec 15 06:31:46 2020 (r368653) @@ -365,7 +365,7 @@ aio_readv_test(struct aio_context *ac, completion comp bzero(ac->ac_buffer, ac->ac_buflen); bzero(&aio, sizeof(aio)); - aio.aio_fildes = ac->ac_write_fd; + aio.aio_fildes = ac->ac_read_fd; aio.aio_offset = 0; len0 = ac->ac_buflen * 3 / 4; len1 = ac->ac_buflen / 4; @@ -543,17 +543,16 @@ aio_unix_socketpair_test(completion comp, struct sigev aio_context_init(&ac, sockets[0], sockets[1], UNIX_SOCKETPAIR_LEN); ATF_REQUIRE_MSG(getrusage(RUSAGE_SELF, &ru_before) != -1, "getrusage failed: %s", strerror(errno)); - if (vectored) + if (vectored) { aio_writev_test(&ac, comp, sev); - else + aio_readv_test(&ac, comp, sev); + } else { aio_write_test(&ac, comp, sev); + aio_read_test(&ac, comp, sev); + } ATF_REQUIRE_MSG(getrusage(RUSAGE_SELF, &ru_after) != -1, "getrusage failed: %s", strerror(errno)); ATF_REQUIRE(ru_after.ru_msgsnd == ru_before.ru_msgsnd + 1); - ru_before = ru_after; - aio_read_test(&ac, comp, sev); - ATF_REQUIRE_MSG(getrusage(RUSAGE_SELF, &ru_after) != -1, - "getrusage failed: %s", strerror(errno)); ATF_REQUIRE(ru_after.ru_msgrcv == ru_before.ru_msgrcv + 1); close(sockets[0]); @@ -1054,14 +1053,11 @@ ATF_TC_BODY(aio_socket_two_reads, tc) close(s[0]); } -/* - * This test ensures that aio_write() on a blocking socket of a "large" - * buffer does not return a short completion. - */ -ATF_TC_WITHOUT_HEAD(aio_socket_blocking_short_write); -ATF_TC_BODY(aio_socket_blocking_short_write, tc) +static void +aio_socket_blocking_short_write_test(bool vectored) { struct aiocb iocb, *iocbp; + struct iovec iov[2]; char *buffer[2]; ssize_t done; int buffer_size, sb_size; @@ -1101,9 +1097,19 @@ ATF_TC_BODY(aio_socket_blocking_short_write, tc) memset(&iocb, 0, sizeof(iocb)); iocb.aio_fildes = s[1]; - iocb.aio_buf = buffer[1]; - iocb.aio_nbytes = buffer_size; - ATF_REQUIRE(aio_write(&iocb) == 0); + if (vectored) { + iov[0].iov_base = buffer[1]; + iov[0].iov_len = buffer_size / 2 + 1; + iov[1].iov_base = buffer[1] + buffer_size / 2 + 1; + iov[1].iov_len = buffer_size / 2 - 1; + iocb.aio_iov = iov; + iocb.aio_iovcnt = 2; + ATF_REQUIRE(aio_writev(&iocb) == 0); + } else { + iocb.aio_buf = buffer[1]; + iocb.aio_nbytes = buffer_size; + ATF_REQUIRE(aio_write(&iocb) == 0); + } done = recv(s[0], buffer[0], buffer_size, MSG_WAITALL); ATF_REQUIRE(done == buffer_size); @@ -1119,6 +1125,26 @@ ATF_TC_BODY(aio_socket_blocking_short_write, tc) } /* + * This test ensures that aio_write() on a blocking socket of a "large" + * buffer does not return a short completion. + */ +ATF_TC_WITHOUT_HEAD(aio_socket_blocking_short_write); +ATF_TC_BODY(aio_socket_blocking_short_write, tc) +{ + aio_socket_blocking_short_write_test(false); +} + +/* + * Like aio_socket_blocking_short_write, but also tests that partially + * completed vectored sends can be retried correctly. + */ +ATF_TC_WITHOUT_HEAD(aio_socket_blocking_short_write_vectored); +ATF_TC_BODY(aio_socket_blocking_short_write_vectored, tc) +{ + aio_socket_blocking_short_write_test(true); +} + +/* * This test verifies that cancelling a partially completed socket write * returns a short write rather than ECANCELED. */ @@ -1686,6 +1712,7 @@ ATF_TP_ADD_TCS(tp) ATF_TP_ADD_TC(tp, aio_large_read_test); ATF_TP_ADD_TC(tp, aio_socket_two_reads); ATF_TP_ADD_TC(tp, aio_socket_blocking_short_write); + ATF_TP_ADD_TC(tp, aio_socket_blocking_short_write_vectored); ATF_TP_ADD_TC(tp, aio_socket_short_write_cancel); ATF_TP_ADD_TC(tp, aio_writev_dos_iov_len); ATF_TP_ADD_TC(tp, aio_writev_dos_iovcnt);
Want to link to this message? Use this URL: <https://mail-archive.FreeBSD.org/cgi/mid.cgi?202012150631.0BF6VkZN072836>