Date: Wed, 5 Nov 2008 09:10:26 -0800 (PST) From: Nate Eldredge <neldredge@math.ucsd.edu> To: rihad <rihad@mail.ru> Cc: freebsd-hackers@freebsd.org Subject: Re: Asynchronous pipe I/O Message-ID: <Pine.GSO.4.64.0811050857000.1597@zeno.ucsd.edu> In-Reply-To: <4911A23B.7050104@mail.ru> References: <4911A23B.7050104@mail.ru>
next in thread | previous in thread | raw e-mail | index | archive | help
On Wed, 5 Nov 2008, rihad wrote: > Imagine this shell pipeline: > > sh prog1 | sh prog2 > > > As given above, prog1 blocks if prog2 hasn't yet read previously written > data (actually, newline separated commands) or is busy. What I want is > for prog1 to never block: > > sh prog1 | buffer | sh prog2 [and misc/buffer is unsuitable] I found an old piece of code laying around that I wrote for this purpose. Looking at it, I can see a number of inefficiencies, but it might do in a pinch. You're welcome to use it; I hereby release it to the public domain. Another hack that you could use, if you don't mind storing the buffer on disk rather than memory, is sh prog1 > tmpfile & tail -f -c +0 tmpfile | sh prog2 Here's my program. /* Buffering filter. */ #include <stdio.h> #include <unistd.h> #include <sys/types.h> #include <stdlib.h> #include <errno.h> #include <string.h> /* Size of a single buffer. */ #define BUFSIZE 512 struct buffer { struct buffer *next; size_t length; unsigned char buf[BUFSIZE]; }; struct buffer *reader; struct buffer *writer; int max_mem = 100 * 1024; int current_mem; #define OK 0 #define WAIT 1 #define GIVEUP 2 int read_one (int fd) { int result; if (current_mem > (max_mem - sizeof(*reader->next))) { fprintf(stderr, "Reached max_mem!\n"); return WAIT; } /* Get a new buffer. */ reader->next = malloc(sizeof(*reader->next)); if (reader->next) { current_mem += sizeof(*reader->next); fprintf(stderr, "\rReading: \t%u bytes in buffer ", current_mem); } else { fprintf(stderr, "Virtual memory exhausted\n"); return WAIT; } reader = reader->next; reader->next = NULL; result = read(fd, reader->buf, BUFSIZE); if (result > 0) reader->length = result; else if (result == 0) { fprintf(stderr, "Hit EOF on reader\n"); return GIVEUP; } else if (result < 0) { fprintf(stderr, "Error on reader: %s\n", strerror(errno)); return GIVEUP; } return OK; } int write_one (int fd) { struct buffer *newwriter; if (reader == writer) return WAIT; /* the reader owns the last buffer */ if (writer->length > 0) { int result; result = write(fd, writer->buf, writer->length); if (result == 0) { fprintf(stderr, "Hit EOF on writer\n"); return GIVEUP; } else if (result < 0) { fprintf(stderr, "Error on writer: %s\n", strerror(errno)); return GIVEUP; } } newwriter = writer->next; free(writer); current_mem -= sizeof(*writer); fprintf(stderr, "\rWriting: \t%u bytes in buffer ", current_mem); writer = newwriter; return OK; } void move_data(int in_fd, int out_fd) { int reader_state = OK; int writer_state = OK; int maxfd = ((in_fd > out_fd) ? in_fd : out_fd) + 1; reader = malloc(sizeof(*reader)); if (!reader) { fprintf(stderr, "No memory at all!\n"); return; } reader->next = NULL; reader->length = 0; writer = reader; current_mem = sizeof(*reader); while (1) /* break when done */ { int result; fd_set read_set, write_set; FD_ZERO(&read_set); FD_ZERO(&write_set); if (reader_state == OK) FD_SET(in_fd, &read_set); if (writer_state == OK) FD_SET(out_fd, &write_set); result = select(maxfd, &read_set, &write_set, NULL, NULL); /* If we're ready to do something, do it. Also let the other end get a chance if something changed. */ if (FD_ISSET(in_fd, &read_set)) { reader_state = read_one(in_fd); if (writer_state == WAIT) writer_state = OK; } if (FD_ISSET(out_fd, &write_set)) { writer_state = write_one(out_fd); if (reader_state == WAIT) reader_state = OK; } /* Check for termination */ if (writer_state == GIVEUP) break; /* can't write any more */ if (reader_state == GIVEUP && writer_state == WAIT) break; /* can't read any more, and wrote all we have */ } } int main(void) { move_data(0, 1); return 0; } -- Nate Eldredge neldredge@math.ucsd.edu
Want to link to this message? Use this URL: <https://mail-archive.FreeBSD.org/cgi/mid.cgi?Pine.GSO.4.64.0811050857000.1597>