| plan 9 kernel history: overview | file list | diff list |
2001/0203/port/qio.c (diff list | history)
| port/qio.c on 1993/0526 | ||
| 1993/0526 | #include "u.h" #include "../port/lib.h" #include "mem.h" #include "dat.h" #include "fns.h" #include "../port/error.h" | |
| 1997/0327 | static ulong padblockcnt; static ulong concatblockcnt; static ulong pullupblockcnt; static ulong copyblockcnt; static ulong consumecnt; static ulong producecnt; static ulong qcopycnt; | |
| 1998/0605 | static int debugging; | |
| 1994/0323 | ||
| 1997/1105 | #define QDEBUG if(0) | |
| 1994/0902 | ||
| 1993/0526 | /* | |
| 1993/0530 | * IO queues */ typedef struct Queue Queue; struct Queue { Lock; | |
| 1995/0125 | Block* bfirst; /* buffer */ | |
| 1994/0321 | Block* blast; | |
| 1993/0530 | ||
| 1998/0922 | int len; /* bytes allocated to queue */ int dlen; /* data bytes in queue */ | |
| 1993/0530 | int limit; /* max bytes in queue */ | |
| 1994/0902 | int inilim; /* initial limit */ | |
| 1993/0530 | int state; | |
| 1994/0902 | int noblock; /* true if writes return immediately when q full */ | |
| 1993/0908 | int eof; /* number of eofs read by user */ | |
| 1993/0530 | void (*kick)(void*); /* restart output */ | |
| 1994/0321 | void* arg; /* argument to kick */ | |
| 1993/0530 | QLock rlock; /* mutex for reading processes */ Rendez rr; /* process waiting to read */ QLock wlock; /* mutex for writing processes */ Rendez wr; /* process waiting to write */ | |
| 1997/0327 | char err[ERRLEN]; | |
| 1993/0530 | }; enum { | |
| 1997/0327 | /* Queue.state */ | |
| 1994/0507 | Qstarve = (1<<0), /* consumer starved */ Qmsg = (1<<1), /* message stream */ Qclosed = (1<<2), Qflow = (1<<3), | |
| 2001/0127 | Qcoalesce = (1<<4), /* coallesce packets on read */ | |
| 1997/0925 | ||
| 1999/0219 | Maxatomic = 32*1024, | |
| 1993/0530 | }; | |
| 1993/0811 | void | |
| 1997/0925 | ixsummary(void) { | |
| 1998/0605 | debugging ^= 1; | |
| 1999/0527 | iallocsummary(); | |
| 1997/0925 | print("pad %lud, concat %lud, pullup %lud, copy %lud\n", padblockcnt, concatblockcnt, pullupblockcnt, copyblockcnt); print("consume %lud, produce %lud, qcopy %lud\n", consumecnt, producecnt, qcopycnt); | |
| 1993/0526 | } | |
| 1997/0327 | /* * free a list of blocks */ | |
| 1993/0526 | void | |
| 1997/0327 | freeblist(Block *b) { Block *next; for(; b != 0; b = next){ next = b->next; b->next = 0; freeb(b); } } /* * pad a block to the front (or the back if size is negative) */ Block* padblock(Block *bp, int size) { int n; Block *nbp; | |
| 1998/0605 | QDEBUG checkb(bp, "padblock 1"); | |
| 1997/0327 | if(size >= 0){ if(bp->rp - bp->base >= size){ bp->rp -= size; return bp; } | |
| 1998/0605 | if(bp->next) | |
| 1999/0501 | panic("padblock 0x%uX", getcallerpc(&bp)); | |
| 1997/0327 | n = BLEN(bp); | |
| 1997/0925 | padblockcnt++; | |
| 1997/0327 | nbp = allocb(size+n); nbp->rp += size; nbp->wp = nbp->rp; memmove(nbp->wp, bp->rp, n); nbp->wp += n; freeb(bp); nbp->rp -= size; } else { size = -size; | |
| 1999/0311 | if(bp->next) | |
| 1999/0501 | panic("padblock 0x%uX", getcallerpc(&bp)); | |
| 1999/0311 | ||
| 1997/0327 | if(bp->lim - bp->wp >= size) return bp; n = BLEN(bp); | |
| 1997/0925 | padblockcnt++; | |
| 1997/0327 | nbp = allocb(size+n); memmove(nbp->wp, bp->rp, n); nbp->wp += n; freeb(bp); } | |
| 1998/0605 | QDEBUG checkb(nbp, "padblock 1"); | |
| 1997/0327 | return nbp; } /* * return count of bytes in a string of blocks */ int blocklen(Block *bp) { int len; len = 0; while(bp) { len += BLEN(bp); bp = bp->next; } return len; } /* * copy the string of blocks into * a single block and free the string */ Block* concatblock(Block *bp) { int len; Block *nb, *f; if(bp->next == 0) return bp; nb = allocb(blocklen(bp)); for(f = bp; f; f = f->next) { len = BLEN(f); memmove(nb->wp, f->rp, len); nb->wp += len; } concatblockcnt += BLEN(nb); freeblist(bp); | |
| 1998/0605 | QDEBUG checkb(nb, "concatblock 1"); | |
| 1997/0327 | return nb; } /* * make sure the first block has at least n bytes */ Block* pullupblock(Block *bp, int n) { int i; Block *nbp; /* * this should almost always be true, it's * just to avoid every caller checking. */ if(BLEN(bp) >= n) return bp; /* * if not enough room in the first block, * add another to the front of the list. */ if(bp->lim - bp->rp < n){ nbp = allocb(n); nbp->next = bp; bp = nbp; } /* * copy bytes from the trailing blocks into the first */ n -= BLEN(bp); while(nbp = bp->next){ i = BLEN(nbp); if(i > n) { memmove(bp->wp, nbp->rp, n); | |
| 1997/0925 | pullupblockcnt++; | |
| 1997/0327 | bp->wp += n; nbp->rp += n; | |
| 1998/0605 | QDEBUG checkb(bp, "pullupblock 1"); | |
| 1997/0327 | return bp; } else { memmove(bp->wp, nbp->rp, i); | |
| 1997/0925 | pullupblockcnt++; | |
| 1997/0327 | bp->wp += i; bp->next = nbp->next; nbp->next = 0; freeb(nbp); n -= i; | |
| 1998/0605 | if(n == 0){ QDEBUG checkb(bp, "pullupblock 2"); | |
| 1997/0327 | return bp; | |
| 1998/0605 | } | |
| 1997/0327 | } } freeb(bp); return 0; } /* * trim to len bytes starting at offset */ Block * trimblock(Block *bp, int offset, int len) { ulong l; Block *nb, *startb; | |
| 1998/0605 | QDEBUG checkb(bp, "trimblock 1"); | |
| 1997/0327 | if(blocklen(bp) < offset+len) { freeblist(bp); return nil; } while((l = BLEN(bp)) < offset) { offset -= l; nb = bp->next; bp->next = nil; freeb(bp); bp = nb; } startb = bp; bp->rp += offset; while((l = BLEN(bp)) < len) { len -= l; bp = bp->next; } bp->wp -= (BLEN(bp) - len); if(bp->next) { freeblist(bp->next); bp->next = nil; } return startb; } /* * copy 'count' bytes into a new block */ Block* copyblock(Block *bp, int count) { int l; Block *nbp; | |
| 1998/0605 | QDEBUG checkb(bp, "copyblock 0"); | |
| 1997/0327 | nbp = allocb(count); for(; count > 0 && bp != 0; bp = bp->next){ l = BLEN(bp); if(l > count) l = count; memmove(nbp->wp, bp->rp, l); nbp->wp += l; count -= l; } if(count > 0){ memset(nbp->wp, 0, count); nbp->wp += count; } | |
| 1997/0925 | copyblockcnt++; | |
| 1998/0605 | QDEBUG checkb(nbp, "copyblock 1"); | |
| 1997/0327 | return nbp; } | |
| 1997/0925 | Block* adjustblock(Block* bp, int len) { int n; | |
| 1997/1007 | Block *nbp; | |
| 1997/0925 | if(len < 0){ freeb(bp); return nil; } | |
| 1997/1007 | if(bp->rp+len > bp->lim){ nbp = copyblock(bp, len); freeblist(bp); | |
| 1998/0605 | QDEBUG checkb(nbp, "adjustblock 1"); | |
| 1997/1007 | return nbp; } | |
| 1997/0925 | n = BLEN(bp); if(len > n) memset(bp->wp, 0, len-n); bp->wp = bp->rp+len; | |
| 1998/0605 | QDEBUG checkb(bp, "adjustblock 2"); | |
| 1997/0925 | return bp; } | |
| 1997/0327 | /* * throw away up to count bytes from a * list of blocks. Return count of bytes * thrown away. */ int pullblock(Block **bph, int count) { Block *bp; int n, bytes; bytes = 0; if(bph == nil) return 0; while(*bph != nil && count != 0) { bp = *bph; n = BLEN(bp); if(count < n) n = count; bytes += n; count -= n; bp->rp += n; | |
| 1998/0605 | QDEBUG checkb(bp, "pullblock "); | |
| 1997/0327 | if(BLEN(bp) == 0) { *bph = bp->next; bp->next = nil; freeb(bp); } } return bytes; | |
| 1995/0902 | } /* | |
| 1997/0327 | * get next block from a queue, return null if nothing there | |
| 1995/0902 | */ Block* | |
| 1997/0327 | qget(Queue *q) | |
| 1995/0902 | { | |
| 1997/0327 | int dowakeup; Block *b; | |
| 1995/0902 | ||
| 1997/0327 | /* sync with qwrite */ ilock(q); | |
| 1995/0902 | ||
| 1997/0327 | b = q->bfirst; if(b == 0){ q->state |= Qstarve; iunlock(q); return 0; } q->bfirst = b->next; b->next = 0; | |
| 1998/0918 | q->len -= BALLOC(b); | |
| 1998/0922 | q->dlen -= BLEN(b); | |
| 1998/0605 | QDEBUG checkb(b, "qget"); | |
| 1995/1121 | ||
| 1997/0327 | /* if writer flow controlled, restart */ if((q->state & Qflow) && q->len < q->limit/2){ q->state &= ~Qflow; dowakeup = 1; } else dowakeup = 0; | |
| 1995/1121 | ||
| 1997/0327 | iunlock(q); if(dowakeup) wakeup(&q->wr); return b; } /* * throw away the next 'len' bytes in the queue */ void qdiscard(Queue *q, int len) { Block *b; int dowakeup, n, sofar; | |
| 1997/0410 | ilock(q); | |
| 1997/0327 | for(sofar = 0; sofar < len; sofar += n){ b = q->bfirst; if(b == nil) break; | |
| 1998/0605 | QDEBUG checkb(b, "qdiscard"); | |
| 1997/0327 | n = BLEN(b); if(n <= len - sofar){ q->bfirst = b->next; b->next = 0; | |
| 1998/0918 | q->len -= BALLOC(b); | |
| 1998/0922 | q->dlen -= BLEN(b); | |
| 1997/0327 | freeb(b); } else { n = len - sofar; b->rp += n; | |
| 1998/0922 | q->dlen -= n; | |
| 1997/0327 | } | |
| 1995/1121 | } | |
| 1997/0327 | /* if writer flow controlled, restart */ if((q->state & Qflow) && q->len < q->limit/2){ q->state &= ~Qflow; dowakeup = 1; } else dowakeup = 0; | |
| 1997/0410 | iunlock(q); | |
| 1997/0327 | if(dowakeup) wakeup(&q->wr); | |
| 1993/0526 | } /* | |
| 1994/0322 | * Interrupt level copy out of a queue, return # bytes copied. | |
| 1993/0526 | */ int | |
| 1993/0601 | qconsume(Queue *q, void *vp, int len) | |
| 1993/0526 | { Block *b; | |
| 1993/0528 | int n, dowakeup; | |
| 1993/0601 | uchar *p = vp; | |
| 1999/0115 | Block *tofree = nil; | |
| 1993/0526 | ||
| 1993/0527 | /* sync with qwrite */ | |
| 1997/0410 | ilock(q); | |
| 1993/0527 | ||
| 1994/1124 | for(;;) { b = q->bfirst; if(b == 0){ q->state |= Qstarve; | |
| 1997/0410 | iunlock(q); | |
| 1994/1124 | return -1; } QDEBUG checkb(b, "qconsume 1"); | |
| 1993/0528 | ||
| 1994/1124 | n = BLEN(b); if(n > 0) break; q->bfirst = b->next; | |
| 1998/0918 | q->len -= BALLOC(b); | |
| 1999/0115 | /* remember to free this */ b->next = tofree; tofree = b; | |
| 1994/1124 | }; | |
| 1993/0526 | if(n < len) len = n; memmove(p, b->rp, len); | |
| 1997/0327 | consumecnt += n; | |
| 1993/0527 | if((q->state & Qmsg) || len == n) | |
| 1993/0526 | q->bfirst = b->next; | |
| 1994/0222 | b->rp += len; | |
| 1998/0922 | q->dlen -= len; | |
| 1993/0526 | ||
| 1999/0115 | /* discard the block if we're done with it */ if((q->state & Qmsg) || len == n){ b->next = 0; q->len -= BALLOC(b); q->dlen -= BLEN(b); /* remember to free this */ b->next = tofree; tofree = b; } | |
| 1993/0528 | /* if writer flow controlled, restart */ if((q->state & Qflow) && q->len < q->limit/2){ q->state &= ~Qflow; dowakeup = 1; } else dowakeup = 0; | |
| 1993/0526 | ||
| 1997/0410 | iunlock(q); | |
| 1993/0526 | ||
| 1993/0528 | if(dowakeup) wakeup(&q->wr); | |
| 1999/0115 | if(tofree != nil) freeblist(tofree); | |
| 1993/0526 | return len; } | |
| 1993/0528 | int | |
| 1994/0311 | qpass(Queue *q, Block *b) { | |
| 1998/0922 | int dlen, len, dowakeup; | |
| 1994/0311 | /* sync with qread */ dowakeup = 0; | |
| 1994/0323 | ilock(q); | |
| 1997/0404 | if(q->len >= q->limit){ | |
| 1998/0918 | freeblist(b); | |
| 1997/0404 | iunlock(q); return -1; } | |
| 1994/0311 | ||
| 1997/0327 | /* add buffer to queue */ | |
| 1994/0311 | if(q->bfirst) q->blast->next = b; else q->bfirst = b; | |
| 1998/0918 | len = BALLOC(b); | |
| 1998/0922 | dlen = BLEN(b); | |
| 1997/0327 | QDEBUG checkb(b, "qpass"); while(b->next){ b = b->next; QDEBUG checkb(b, "qpass"); | |
| 1998/0918 | len += BALLOC(b); | |
| 1998/0922 | dlen += BLEN(b); | |
| 1997/0327 | } | |
| 1994/0311 | q->blast = b; q->len += len; | |
| 1998/0922 | q->dlen += dlen; | |
| 1994/0311 | ||
| 1994/0418 | if(q->len >= q->limit/2) | |
| 1994/0324 | q->state |= Qflow; | |
| 1994/0311 | if(q->state & Qstarve){ q->state &= ~Qstarve; dowakeup = 1; } | |
| 1994/0323 | iunlock(q); | |
| 1994/0311 | ||
| 1994/1117 | if(dowakeup) | |
| 1994/0311 | wakeup(&q->rr); return len; } int | |
| 1998/0918 | qpassnolim(Queue *q, Block *b) { | |
| 1998/0922 | int dlen, len, dowakeup; | |
| 1998/0918 | /* sync with qread */ dowakeup = 0; ilock(q); /* add buffer to queue */ if(q->bfirst) q->blast->next = b; else q->bfirst = b; len = BALLOC(b); | |
| 1998/0922 | dlen = BLEN(b); | |
| 1998/0918 | QDEBUG checkb(b, "qpass"); while(b->next){ b = b->next; QDEBUG checkb(b, "qpass"); len += BALLOC(b); | |
| 1998/0922 | dlen += BLEN(b); | |
| 1998/0918 | } q->blast = b; q->len += len; | |
| 1998/0922 | q->dlen += dlen; | |
| 1998/0918 | if(q->len >= q->limit/2) q->state |= Qflow; if(q->state & Qstarve){ q->state &= ~Qstarve; dowakeup = 1; } iunlock(q); if(dowakeup) wakeup(&q->rr); return len; } /* * if the allocated space is way out of line with the used * space, reallocate to a smaller block */ Block* packblock(Block *bp) { | |
| 1998/0923 | Block **l, *nbp; int n; | |
| 1998/0918 | ||
| 1998/0923 | for(l = &bp; *l; l = &(*l)->next){ nbp = *l; n = BLEN(nbp); if((n<<2) < BALLOC(nbp)){ *l = allocb(n); memmove((*l)->wp, nbp->rp, n); (*l)->wp += n; (*l)->next = nbp->next; freeb(nbp); } | |
| 1998/0918 | } return bp; } int | |
| 1993/0601 | qproduce(Queue *q, void *vp, int len) | |
| 1993/0526 | { Block *b; | |
| 1995/0714 | int dowakeup; | |
| 1993/0601 | uchar *p = vp; | |
| 1993/0526 | ||
| 1993/0527 | /* sync with qread */ | |
| 1994/0208 | dowakeup = 0; | |
| 1997/0410 | ilock(q); | |
| 1993/0527 | ||
| 1993/0526 | /* no waiting receivers, room in buffer? */ if(q->len >= q->limit){ | |
| 1994/1116 | q->state |= Qflow; | |
| 1997/0410 | iunlock(q); | |
| 1993/0526 | return -1; } /* save in buffer */ | |
| 1994/0222 | b = iallocb(len); if(b == 0){ | |
| 1997/0410 | iunlock(q); | |
| 1994/1117 | return 0; | |
| 1993/0526 | } | |
| 1994/0222 | memmove(b->wp, p, len); | |
| 1997/0327 | producecnt += len; | |
| 1994/0222 | b->wp += len; if(q->bfirst) q->blast->next = b; else q->bfirst = b; q->blast = b; | |
| 1997/0327 | /* b->next = 0; done by iallocb() */ | |
| 1998/0918 | q->len += BALLOC(b); | |
| 1998/0922 | q->dlen += BLEN(b); | |
| 1994/0902 | QDEBUG checkb(b, "qproduce"); | |
| 1994/0222 | ||
| 1993/0528 | if(q->state & Qstarve){ q->state &= ~Qstarve; dowakeup = 1; | |
| 1994/0208 | } | |
| 1999/0115 | if(q->len >= q->limit) q->state |= Qflow; | |
| 1997/0410 | iunlock(q); | |
| 1993/0527 | ||
| 1994/1117 | if(dowakeup) | |
| 1993/0528 | wakeup(&q->rr); | |
| 1993/0526 | return len; } /* | |
| 1997/0327 | * copy from offset in the queue */ Block* qcopy(Queue *q, int len, ulong offset) { int sofar; int n; Block *b, *nb; uchar *p; nb = allocb(len); | |
| 1997/0410 | ilock(q); | |
| 1997/0327 | /* go to offset */ b = q->bfirst; for(sofar = 0; ; sofar += n){ if(b == nil){ | |
| 1997/0410 | iunlock(q); | |
| 1997/0327 | return nb; } n = BLEN(b); if(sofar + n > offset){ p = b->rp + offset - sofar; n -= offset - sofar; break; } | |
| 1998/0605 | QDEBUG checkb(b, "qcopy"); | |
| 1997/0327 | b = b->next; } /* copy bytes from there */ for(sofar = 0; sofar < len;){ if(n > len - sofar) n = len - sofar; memmove(nb->wp, p, n); qcopycnt += n; sofar += n; nb->wp += n; b = b->next; if(b == nil) break; n = BLEN(b); p = b->rp; } | |
| 1997/0410 | iunlock(q); | |
| 1997/0327 | return nb; } /* | |
| 1993/0526 | * called by non-interrupt code */ Queue* | |
| 1993/0530 | qopen(int limit, int msg, void (*kick)(void*), void *arg) | |
| 1993/0526 | { Queue *q; q = malloc(sizeof(Queue)); if(q == 0) | |
| 1993/0528 | return 0; | |
| 1998/0328 | ilock(q); | |
| 1994/0902 | q->limit = q->inilim = limit; | |
| 1993/0526 | q->kick = kick; q->arg = arg; | |
| 2001/0127 | q->state = 0; if(msg > 0) q->state |= Qmsg; else if(msg < 0) q->state |= Qcoalesce; | |
| 1993/0601 | q->state |= Qstarve; | |
| 1993/0908 | q->eof = 0; | |
| 1998/0703 | q->noblock = 0; | |
| 1998/0328 | iunlock(q); | |
| 1993/0526 | return q; } static int | |
| 1993/0528 | notempty(void *a) | |
| 1993/0526 | { | |
| 1993/0528 | Queue *q = a; | |
| 1993/0526 | ||
| 1994/0323 | return (q->state & Qclosed) || q->bfirst != 0; | |
| 1993/0526 | } | |
| 1993/0528 | /* | |
| 2001/0128 | * wait for the queue to be non-empty or closed. * called with q ilocked. | |
| 1993/0528 | */ | |
| 2001/0128 | static int qwait(Queue *q) | |
| 1993/0526 | { | |
| 1993/0528 | /* wait for data */ for(;;){ | |
| 2001/0203 | if(q->bfirst != nil) | |
| 1993/0728 | break; | |
| 1993/0528 | if(q->state & Qclosed){ | |
| 1993/0908 | if(++q->eof > 3) | |
| 2001/0128 | return -1; | |
| 1993/0528 | return 0; | |
| 1993/0527 | } | |
| 1995/0714 | q->state |= Qstarve; /* flag requesting producer to wake me */ iunlock(q); sleep(&q->rr, notempty, q); | |
| 2001/0128 | ilock(q); | |
| 1993/0526 | } | |
| 2001/0128 | return 1; } | |
| 1993/0526 | ||
| 2001/0128 | /* * called with q ilocked */ static Block* qremove(Queue *q) { Block *b; b = q->bfirst; if(b == nil) return nil; | |
| 1993/0528 | q->bfirst = b->next; | |
| 2001/0128 | b->next = nil; q->dlen -= BLEN(b); | |
| 1998/0918 | q->len -= BALLOC(b); | |
| 2001/0128 | QDEBUG checkb(b, "qremove"); return b; } | |
| 1993/0528 | ||
| 2001/0128 | /* * put a block back to the front of the queue * called with q ilocked */ static void qputback(Queue *q, Block *b) { b->next = q->bfirst; if(q->bfirst == nil) q->blast = b; q->bfirst = b; q->len += BALLOC(b); q->dlen += BLEN(b); } | |
| 1999/0115 | ||
| 2001/0128 | /* * flow control, get producer going again * called with q ilocked */ static void qwakeup_iunlock(Queue *q) { int dowakeup = 0; | |
| 1999/0115 | /* if writer flow controlled, restart */ if((q->state & Qflow) && q->len < q->limit/2){ q->state &= ~Qflow; dowakeup = 1; | |
| 2001/0128 | } | |
| 1993/0527 | ||
| 1995/0714 | iunlock(q); /* wakeup flow controlled writers */ | |
| 1994/0324 | if(dowakeup){ if(q->kick) | |
| 1997/0327 | q->kick(q->arg); | |
| 1993/0528 | wakeup(&q->wr); | |
| 1994/0324 | } | |
| 2001/0128 | } | |
| 1993/0528 | ||
| 2001/0128 | /* * get next block from a queue (up to a limit) */ Block* qbread(Queue *q, int len) { Block *b, *nb; int n; qlock(&q->rlock); if(waserror()){ qunlock(&q->rlock); nexterror(); } ilock(q); switch(qwait(q)){ case 0: /* queue closed */ iunlock(q); qunlock(&q->rlock); poperror(); return nil; case -1: /* multiple reads on a closed queue */ iunlock(q); error(q->err); } /* if we get here, there's at least one block in the queue */ b = qremove(q); n = BLEN(b); /* split block if it's too big and this is not a message queue */ nb = b; if(n > len){ if((q->state&Qmsg) == 0){ n -= len; b = allocb(n); memmove(b->wp, nb->rp+len, n); b->wp += n; qputback(q, b); } nb->wp = nb->rp + len; } /* restart producer */ qwakeup_iunlock(q); | |
| 1993/0527 | poperror(); | |
| 1993/0526 | qunlock(&q->rlock); | |
| 1995/0714 | return nb; | |
| 1993/0526 | } | |
| 1995/0714 | /* * read a queue. if no data is queued, post a Block * and wait on its Rendez. */ long qread(Queue *q, void *vp, int len) { | |
| 2001/0128 | Block *b, *first, *next, **l; | |
| 2001/0203 | int m; uchar *s, *e, *p; | |
| 1995/0714 | ||
| 2001/0203 | s = p = vp; e = s+len; | |
| 2001/0128 | qlock(&q->rlock); if(waserror()){ qunlock(&q->rlock); nexterror(); } | |
| 1995/0714 | ||
| 2001/0128 | ilock(q); again: switch(qwait(q)){ case 0: /* queue closed */ iunlock(q); qunlock(&q->rlock); poperror(); return 0; case -1: /* multiple reads on a closed queue */ iunlock(q); error(q->err); } | |
| 2001/0127 | ||
| 2001/0128 | /* if we get here, there's at least one block in the queue */ if(q->state & Qcoalesce){ /* when coalescing, 0 length blocks just go away */ | |
| 2001/0203 | b = q->bfirst; if(BLEN(b) <= 0){ | |
| 2001/0128 | freeb(qremove(q)); goto again; } /* grab the first block plus as many * following blocks as will completely * fit in the read. */ l = &first; | |
| 2001/0127 | m = BLEN(b); | |
| 2001/0128 | for(;;) { *l = qremove(q); l = &b->next; | |
| 2001/0203 | p += m; | |
| 2001/0128 | b = q->bfirst; if(b == nil) break; m = BLEN(b); | |
| 2001/0203 | if(p+m > e) | |
| 2001/0128 | break; } } else { first = qremove(q); | |
| 2001/0127 | } | |
| 2001/0128 | /* copy to user space outside of the ilock */ iunlock(q); | |
| 2001/0203 | p = s; | |
| 2001/0128 | for(b = first; b != nil; b = next){ | |
| 2001/0127 | m = BLEN(b); | |
| 2001/0203 | if(m > e - p){ m = e - p; | |
| 2001/0128 | memmove(p, b->rp, m); | |
| 2001/0203 | p += m; | |
| 2001/0128 | b->rp += m; break; } | |
| 2001/0127 | memmove(p, b->rp, m); | |
| 2001/0128 | p += m; next = b->next; b->next = nil; | |
| 2001/0127 | freeb(b); } | |
| 2001/0128 | ilock(q); | |
| 2001/0127 | ||
| 2001/0128 | /* take care of any left over partial block */ if(b != nil){ if(q->state & Qmsg) freeb(b); else qputback(q, b); } /* restart producer */ qwakeup_iunlock(q); poperror(); qunlock(&q->rlock); | |
| 2001/0203 | return p-s; | |
| 1995/0714 | } | |
| 1993/0528 | static int qnotfull(void *a) | |
| 1993/0526 | { | |
| 1993/0528 | Queue *q = a; | |
| 1993/0526 | ||
| 1994/0215 | return q->len < q->limit || (q->state & Qclosed); | |
| 1993/0528 | } | |
| 1993/0527 | ||
| 1993/0528 | /* | |
| 1995/0714 | * add a block to a queue obeying flow control | |
| 1993/0528 | */ long | |
| 1995/0714 | qbwrite(Queue *q, Block *b) | |
| 1993/0528 | { | |
| 1995/0714 | int n, dowakeup; | |
| 1993/0526 | ||
| 1994/0208 | dowakeup = 0; | |
| 1995/0714 | n = BLEN(b); | |
| 1998/0328 | qlock(&q->wlock); | |
| 1994/0208 | if(waserror()){ | |
| 2000/0913 | if(b != nil) freeb(b); | |
| 1994/0208 | qunlock(&q->wlock); nexterror(); | |
| 1994/0804 | } | |
| 1994/0208 | ||
| 2000/0914 | ilock(q); | |
| 1998/0328 | ||
| 2000/0914 | /* give up if the queue is closed */ if(q->state & Qclosed){ iunlock(q); error(q->err); } | |
| 1998/0328 | ||
| 2000/0914 | /* if nonblocking, don't queue over the limit */ if(q->len >= q->limit){ | |
| 1995/0714 | if(q->noblock){ | |
| 1998/0328 | iunlock(q); | |
| 1995/0714 | freeb(b); | |
| 1993/1227 | qunlock(&q->wlock); | |
| 1994/0208 | poperror(); | |
| 1995/0714 | return n; | |
| 1993/1227 | } | |
| 1993/0526 | } | |
| 2000/0914 | /* queue the block */ | |
| 1995/0714 | if(q->bfirst) q->blast->next = b; else q->bfirst = b; q->blast = b; | |
| 1997/0327 | b->next = 0; | |
| 1998/0918 | q->len += BALLOC(b); | |
| 1998/0922 | q->dlen += n; | |
| 1998/0605 | QDEBUG checkb(b, "qbwrite"); | |
| 2000/0913 | b = nil; | |
| 1995/0714 | ||
| 2000/0914 | /* make sure other end gets awakened */ | |
| 1995/0714 | if(q->state & Qstarve){ q->state &= ~Qstarve; dowakeup = 1; } iunlock(q); if(dowakeup){ if(q->kick) | |
| 1997/0327 | q->kick(q->arg); | |
| 1995/0714 | wakeup(&q->rr); } | |
| 2000/0914 | /* * flow control, wait for queue to get below the limit * before allowing the process to continue and queue * more. We do this here so that postnote can only * interrupt us after the data has been quued. This * means that things like 9p flushes and ssl messages * will not be disrupted by software interrupts. * * Note - this is moderately dangerous since a process * that keeps getting interrupted and rewriting will * queue infinite crud. */ for(;;){ if(q->noblock || qnotfull(q)) break; ilock(q); q->state |= Qflow; iunlock(q); sleep(&q->wr, qnotfull, q); } USED(b); | |
| 1995/0714 | qunlock(&q->wlock); poperror(); return n; } /* | |
| 1999/0219 | * write to a queue. only Maxatomic bytes at a time is atomic. | |
| 1995/0714 | */ | |
| 1997/0327 | int | |
| 1995/0714 | qwrite(Queue *q, void *vp, int len) { int n, sofar; Block *b; uchar *p = vp; | |
| 1997/0327 | QDEBUG if(islo() == 0) | |
| 1999/0501 | print("qwrite hi %lux\n", getcallerpc(&q)); | |
| 1995/0714 | sofar = 0; | |
| 1994/0208 | do { n = len-sofar; | |
| 1999/0219 | if(n > Maxatomic) n = Maxatomic; | |
| 1993/0528 | ||
| 1994/0208 | b = allocb(n); | |
| 1999/0714 | setmalloctag(b, (up->text[0]<<24)|(up->text[1]<<16)|(up->text[2]<<8)|up->text[3]); | |
| 1994/0804 | if(waserror()){ freeb(b); nexterror(); } | |
| 1994/0208 | memmove(b->wp, p+sofar, n); | |
| 1995/0714 | poperror(); | |
| 1994/0208 | b->wp += n; | |
| 1994/0929 | ||
| 1995/0714 | qbwrite(q, b); | |
| 1994/0929 | ||
| 1994/0208 | sofar += n; } while(sofar < len && (q->state & Qmsg) == 0); | |
| 1993/0528 | ||
| 1993/0526 | return len; } | |
| 1993/0528 | /* | |
| 1995/0714 | * used by print() to write to a queue. Since we may be splhi or not in * a process, don't qlock. | |
| 1994/1124 | */ | |
| 1997/0327 | int | |
| 1994/1124 | qiwrite(Queue *q, void *vp, int len) { int n, sofar, dowakeup; Block *b; uchar *p = vp; dowakeup = 0; sofar = 0; do { n = len-sofar; | |
| 1999/0219 | if(n > Maxatomic) n = Maxatomic; | |
| 1994/1124 | b = allocb(n); memmove(b->wp, p+sofar, n); b->wp += n; ilock(q); QDEBUG checkb(b, "qiwrite"); | |
| 1995/0714 | if(q->bfirst) q->blast->next = b; else q->bfirst = b; q->blast = b; | |
| 1998/0918 | q->len += BALLOC(b); | |
| 1998/0922 | q->dlen += n; | |
| 1994/1124 | ||
| 1995/0714 | if(q->state & Qstarve){ q->state &= ~Qstarve; dowakeup = 1; | |
| 1994/1124 | } iunlock(q); if(dowakeup){ if(q->kick) | |
| 1997/0327 | q->kick(q->arg); | |
| 1994/1124 | wakeup(&q->rr); } sofar += n; } while(sofar < len && (q->state & Qmsg) == 0); return len; } /* | |
| 1997/0327 | * be extremely careful when calling this, * as there is no reference accounting */ void qfree(Queue *q) { qclose(q); free(q); } /* | |
| 1993/0528 | * Mark a queue as closed. No further IO is permitted. * All blocks are released. */ void qclose(Queue *q) | |
| 1993/0527 | { | |
| 1997/0327 | Block *bfirst; | |
| 1993/0527 | ||
| 1997/0327 | if(q == nil) return; | |
| 1993/0528 | /* mark it */ | |
| 1994/0323 | ilock(q); | |
| 1993/0528 | q->state |= Qclosed; | |
| 1997/0806 | q->state &= ~(Qflow|Qstarve); | |
| 1997/0327 | strcpy(q->err, Ehungup); | |
| 1993/0528 | bfirst = q->bfirst; q->bfirst = 0; | |
| 1994/0219 | q->len = 0; | |
| 1998/0929 | q->dlen = 0; | |
| 1994/0927 | q->noblock = 0; | |
| 1994/0323 | iunlock(q); | |
| 1993/0527 | ||
| 1993/0528 | /* free queued blocks */ | |
| 1997/0327 | freeblist(bfirst); | |
| 1993/0526 | ||
| 1993/0528 | /* wake up readers/writers */ wakeup(&q->rr); wakeup(&q->wr); } | |
| 1993/0527 | ||
| 1993/0528 | /* * Mark a queue as closed. Wakeup any readers. Don't remove queued * blocks. */ void | |
| 1997/0327 | qhangup(Queue *q, char *msg) | |
| 1993/0528 | { /* mark it */ | |
| 1994/0323 | ilock(q); | |
| 1993/0528 | q->state |= Qclosed; | |
| 1997/0327 | if(msg == 0 || *msg == 0) strcpy(q->err, Ehungup); else strncpy(q->err, msg, ERRLEN-1); | |
| 1994/0323 | iunlock(q); | |
| 1993/0527 | ||
| 1993/0528 | /* wake up readers/writers */ wakeup(&q->rr); wakeup(&q->wr); | |
| 1998/1127 | } /* * return non-zero if the q is hungup */ int qisclosed(Queue *q) { return q->state & Qclosed; | |
| 1993/0528 | } /* * mark a queue as no longer hung up */ void qreopen(Queue *q) { | |
| 1998/0328 | ilock(q); | |
| 1993/0528 | q->state &= ~Qclosed; | |
| 1993/0601 | q->state |= Qstarve; | |
| 1993/0908 | q->eof = 0; | |
| 1994/0902 | q->limit = q->inilim; | |
| 1998/0328 | iunlock(q); | |
| 1993/0530 | } /* * return bytes queued */ int qlen(Queue *q) { | |
| 1998/0922 | return q->dlen; | |
| 1993/0601 | } /* | |
| 1994/0327 | * return space remaining before flow control */ int qwindow(Queue *q) { int l; l = q->limit - q->len; if(l < 0) l = 0; return l; } /* | |
| 1993/0601 | * return true if we can read without blocking */ int qcanread(Queue *q) { return q->bfirst!=0; | |
| 1994/0902 | } /* * change queue limit */ void qsetlimit(Queue *q, int limit) { q->limit = limit; } /* * set blocking/nonblocking */ void qnoblock(Queue *q, int onoff) { q->noblock = onoff; | |
| 1994/0927 | } /* * flush the output queue */ void qflush(Queue *q) { | |
| 1997/0327 | Block *bfirst; | |
| 1994/0927 | /* mark it */ ilock(q); bfirst = q->bfirst; q->bfirst = 0; q->len = 0; | |
| 1998/0922 | q->dlen = 0; | |
| 1994/0927 | iunlock(q); /* free queued blocks */ | |
| 1997/0327 | freeblist(bfirst); | |
| 1994/0927 | /* wake up readers/writers */ wakeup(&q->wr); | |
| 1994/1124 | } int | |
| 1997/0327 | qfull(Queue *q) | |
| 1994/1124 | { | |
| 1997/0327 | return q->state & Qflow; | |
| 1993/0526 | } | |
| 1995/1217 | ||
| 1997/0327 | int qstate(Queue *q) | |
| 1995/1217 | { | |
| 1997/0327 | return q->state; | |
| 1995/1217 | } | |