From owner-freebsd-hackers@FreeBSD.ORG Wed Nov 5 17:10:27 2008 Return-Path: Delivered-To: freebsd-hackers@freebsd.org Received: from mx1.freebsd.org (mx1.freebsd.org [IPv6:2001:4f8:fff6::34]) by hub.freebsd.org (Postfix) with ESMTP id 494CB106564A for ; Wed, 5 Nov 2008 17:10:27 +0000 (UTC) (envelope-from neldredge@math.ucsd.edu) Received: from euclid.ucsd.edu (euclid.ucsd.edu [132.239.145.52]) by mx1.freebsd.org (Postfix) with ESMTP id 2E7AC8FC1B for ; Wed, 5 Nov 2008 17:10:26 +0000 (UTC) (envelope-from neldredge@math.ucsd.edu) Received: from zeno.ucsd.edu (zeno.ucsd.edu [132.239.145.22]) by euclid.ucsd.edu (8.11.7p3+Sun/8.11.7) with ESMTP id mA5HAQ614477; Wed, 5 Nov 2008 09:10:26 -0800 (PST) Received: from localhost (neldredg@localhost) by zeno.ucsd.edu (8.11.7p3+Sun/8.11.7) with ESMTP id mA5HAQk11810; Wed, 5 Nov 2008 09:10:26 -0800 (PST) X-Authentication-Warning: zeno.ucsd.edu: neldredg owned process doing -bs Date: Wed, 5 Nov 2008 09:10:26 -0800 (PST) From: Nate Eldredge X-X-Sender: neldredg@zeno.ucsd.edu To: rihad In-Reply-To: <4911A23B.7050104@mail.ru> Message-ID: References: <4911A23B.7050104@mail.ru> MIME-Version: 1.0 Content-Type: TEXT/PLAIN; charset=US-ASCII; format=flowed Cc: freebsd-hackers@freebsd.org Subject: Re: Asynchronous pipe I/O X-BeenThere: freebsd-hackers@freebsd.org X-Mailman-Version: 2.1.5 Precedence: list List-Id: Technical Discussions relating to FreeBSD List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , X-List-Received-Date: Wed, 05 Nov 2008 17:10:27 -0000 On Wed, 5 Nov 2008, rihad wrote: > Imagine this shell pipeline: > > sh prog1 | sh prog2 > > > As given above, prog1 blocks if prog2 hasn't yet read previously written > data (actually, newline separated commands) or is busy. What I want is > for prog1 to never block: > > sh prog1 | buffer | sh prog2 [and misc/buffer is unsuitable] I found an old piece of code laying around that I wrote for this purpose. Looking at it, I can see a number of inefficiencies, but it might do in a pinch. You're welcome to use it; I hereby release it to the public domain. Another hack that you could use, if you don't mind storing the buffer on disk rather than memory, is sh prog1 > tmpfile & tail -f -c +0 tmpfile | sh prog2 Here's my program. /* Buffering filter. */ #include #include #include #include #include #include /* Size of a single buffer. */ #define BUFSIZE 512 struct buffer { struct buffer *next; size_t length; unsigned char buf[BUFSIZE]; }; struct buffer *reader; struct buffer *writer; int max_mem = 100 * 1024; int current_mem; #define OK 0 #define WAIT 1 #define GIVEUP 2 int read_one (int fd) { int result; if (current_mem > (max_mem - sizeof(*reader->next))) { fprintf(stderr, "Reached max_mem!\n"); return WAIT; } /* Get a new buffer. */ reader->next = malloc(sizeof(*reader->next)); if (reader->next) { current_mem += sizeof(*reader->next); fprintf(stderr, "\rReading: \t%u bytes in buffer ", current_mem); } else { fprintf(stderr, "Virtual memory exhausted\n"); return WAIT; } reader = reader->next; reader->next = NULL; result = read(fd, reader->buf, BUFSIZE); if (result > 0) reader->length = result; else if (result == 0) { fprintf(stderr, "Hit EOF on reader\n"); return GIVEUP; } else if (result < 0) { fprintf(stderr, "Error on reader: %s\n", strerror(errno)); return GIVEUP; } return OK; } int write_one (int fd) { struct buffer *newwriter; if (reader == writer) return WAIT; /* the reader owns the last buffer */ if (writer->length > 0) { int result; result = write(fd, writer->buf, writer->length); if (result == 0) { fprintf(stderr, "Hit EOF on writer\n"); return GIVEUP; } else if (result < 0) { fprintf(stderr, "Error on writer: %s\n", strerror(errno)); return GIVEUP; } } newwriter = writer->next; free(writer); current_mem -= sizeof(*writer); fprintf(stderr, "\rWriting: \t%u bytes in buffer ", current_mem); writer = newwriter; return OK; } void move_data(int in_fd, int out_fd) { int reader_state = OK; int writer_state = OK; int maxfd = ((in_fd > out_fd) ? in_fd : out_fd) + 1; reader = malloc(sizeof(*reader)); if (!reader) { fprintf(stderr, "No memory at all!\n"); return; } reader->next = NULL; reader->length = 0; writer = reader; current_mem = sizeof(*reader); while (1) /* break when done */ { int result; fd_set read_set, write_set; FD_ZERO(&read_set); FD_ZERO(&write_set); if (reader_state == OK) FD_SET(in_fd, &read_set); if (writer_state == OK) FD_SET(out_fd, &write_set); result = select(maxfd, &read_set, &write_set, NULL, NULL); /* If we're ready to do something, do it. Also let the other end get a chance if something changed. */ if (FD_ISSET(in_fd, &read_set)) { reader_state = read_one(in_fd); if (writer_state == WAIT) writer_state = OK; } if (FD_ISSET(out_fd, &write_set)) { writer_state = write_one(out_fd); if (reader_state == WAIT) reader_state = OK; } /* Check for termination */ if (writer_state == GIVEUP) break; /* can't write any more */ if (reader_state == GIVEUP && writer_state == WAIT) break; /* can't read any more, and wrote all we have */ } } int main(void) { move_data(0, 1); return 0; } -- Nate Eldredge neldredge@math.ucsd.edu