| plan 9 kernel history: overview | file list | diff list |
1991/0328/port/stream.c (diff list | history)
| port/stream.c on 1990/0227 | ||
| 1990/0227 | #include "u.h" #include "lib.h" #include "mem.h" #include "dat.h" #include "fns.h" #include "io.h" #include "errno.h" #include "devtab.h" | |
| 1990/0801 | #include "fcall.h" | |
| 1990/0227 | ||
| 1990/0321 | enum { | |
| 1990/0907 | Nclass=4, /* number of block classes */ | |
| 1990/0321 | }; | |
| 1990/0312 | /* * process end line discipline */ | |
| 1990/0227 | static void stputq(Queue*, Block*); | |
| 1990/11151 | Qinfo procinfo = { stputq, nullput, 0, 0, "process" }; | |
| 1990/0227 | ||
| 1990/0312 | /* * line disciplines that can be pushed */ | |
| 1990/0911 | static Qinfo *lds; | |
| 1990/0227 | /* * All stream structures are ialloc'd at boot time */ Stream *slist; Queue *qlist; static Lock garbagelock; /* * The block classes. There are Nclass block sizes, each with its own free list. * All are ialloced at qinit() time. */ typedef struct { int size; | |
| 1990/1127 | int lim; int made; | |
| 1990/0312 | Blist; | |
| 1990/03292 | QLock; /* qlock for sleepers on r */ | |
| 1990/0312 | Rendez r; /* sleep here waiting for blocks */ | |
| 1990/0227 | } Bclass; Bclass bclass[Nclass]={ { 0 }, | |
| 1990/0911 | { 68 }, { 260 }, | |
| 1990/0409 | { 4096 }, | |
| 1990/0227 | }; | |
| 1990/0331 | /* | |
| 1990/0227 | * Allocate streams, queues, and blocks. Allocate n block classes with * 1/2(m+1) to class m < n-1 * 1/2(n-1) to class n-1 */ void streaminit(void) { | |
| 1990/0907 | int class, i, n; | |
| 1990/0227 | Bclass *bcp; | |
| 1990/0911 | /* | |
| 1990/1127 | * allocate queues, streams | |
| 1990/0911 | */ | |
| 1990/0227 | slist = (Stream *)ialloc(conf.nstream * sizeof(Stream), 0); qlist = (Queue *)ialloc(conf.nqueue * sizeof(Queue), 0); | |
| 1990/1127 | /* * set limits on blocks */ | |
| 1990/0227 | n = conf.nblock; for(class = 0; class < Nclass; class++){ if(class < Nclass-1) n = n/2; bcp = &bclass[class]; | |
| 1990/1127 | bcp->lim = n; bcp->made = 0; | |
| 1990/0227 | } | |
| 1990/0911 | /* * make stream modules available */ streaminit0(); | |
| 1990/0227 | } /* | |
| 1990/0911 | * make known a stream module and call its initialization routine, if * it has one. */ void newqinfo(Qinfo *qi) { qi->next = lds; lds = qi; if(qi->reset) (*qi->reset)(); } /* | |
| 1990/1127 | * upgrade a block 0 block to another class (called with bcp qlocked) */ | |
| 1990/1214 | int | |
| 1990/1127 | newblock(Bclass *bcp) { Page *page; int n; Block *bp; uchar *cp; if(bcp->made > bcp->lim) | |
| 1990/1214 | return -1; | |
| 1990/1127 | if(bcp == bclass){ /* * create some level zero blocks and return */ page = newpage(1, 0, 0); page->va = VA(kmap(page)); n = BY2PG/sizeof(Block); bp = (Block *)(page->va); while(n-- > 0){ bp->flags = 0; bp->base = bp->lim = bp->rptr = bp->wptr = 0; if(bcp->first) bcp->last->next = bp; else bcp->first = bp; bcp->last = bp; bcp->made++; bp++; } } else { /* * create a page worth of new blocks */ page = newpage(1, 0, 0); page->va = VA(kmap(page)); n = BY2PG/bcp->size; cp = (uchar *)(page->va); while(n-- > 0){ /* * upgrade a level 0 block */ bp = allocb(0); qlock(bclass); bclass->made--; bcp->made++; bp->flags = bcp - bclass; qunlock(bclass); /* * tack on the data area */ bp->base = bp->rptr = bp->wptr = cp; cp += bcp->size; bp->lim = cp; if(bcp->first) bcp->last->next = bp; else bcp->first = bp; bcp->last = bp; } } | |
| 1990/1214 | return 0; | |
| 1990/1127 | } /* | |
| 1990/0227 | * allocate a block */ static int isblock(void *arg) { Bclass *bcp; bcp = (Bclass *)arg; return bcp->first!=0; } Block * allocb(ulong size) { Block *bp; Bclass *bcp; /* * map size to class */ for(bcp=bclass; bcp->size<size && bcp<&bclass[Nclass-1]; bcp++) ; /* | |
| 1990/0930 | * look for a free block | |
| 1990/0227 | */ lock(bcp); while(bcp->first == 0){ | |
| 1990/1127 | if(newblock(bcp) == 0) continue; | |
| 1990/0227 | unlock(bcp); | |
| 1990/03292 | qlock(bcp); | |
| 1990/1113 | if(waserror()){ qunlock(bcp); nexterror(); } | |
| 1990/0322 | tsleep(&bcp->r, isblock, (void *)bcp, 250); | |
| 1990/03292 | qunlock(bcp); | |
| 1990/1113 | poperror(); | |
| 1990/0227 | lock(bcp); } bp = bcp->first; bcp->first = bp->next; if(bcp->first == 0) bcp->last = 0; unlock(bcp); /* * return an empty block */ bp->rptr = bp->wptr = bp->base; bp->next = 0; bp->type = M_DATA; bp->flags &= S_CLASS; | |
| 1991/0328 | if(bp->lim-bp->rptr<size && size<4096) panic("allocb %lux %lux %d %ux %d", bp->lim, bp->rptr, size, bp->flags, bcp-bclass); | |
| 1990/0227 | return bp; } /* | |
| 1990/0312 | * Free a block (or list of blocks). Poison its pointers so that * someone trying to access it after freeing will cause a dump. | |
| 1990/0227 | */ void freeb(Block *bp) { | |
| 1991/0328 | Block *nbp; | |
| 1990/0227 | Bclass *bcp; | |
| 1990/1229 | int x; | |
| 1990/0227 | ||
| 1990/0513 | if((bp->flags&S_CLASS) >= Nclass) panic("freeb class"); | |
| 1991/0328 | for(; bp; bp = nbp){ bcp = &bclass[bp->flags & S_CLASS]; lock(bcp); bp->rptr = bp->wptr = 0; if(bcp->first) bcp->last->next = bp; else bcp->first = bp; bcp->last = bp; nbp = bp->next; bp->next = 0; unlock(bcp); if(bcp->r.p) wakeup(&bcp->r); | |
| 1990/0312 | } | |
| 1990/0227 | } /* | |
| 1990/0911 | * pad a block to the front with n bytes */ Block * padb(Block *bp, int n) { Block *nbp; if(bp->base && bp->rptr-bp->base>=n){ bp->rptr -= n; return bp; } else { nbp = allocb(n); nbp->wptr = nbp->lim; nbp->rptr = nbp->wptr - n; nbp->next = bp; return nbp; } } /* | |
| 1990/0227 | * allocate a pair of queues. flavor them with the requested put routines. * the `QINUSE' flag on the read side is the only one used. */ static Queue * allocq(Qinfo *qi) { Queue *q, *wq; for(q=qlist; q<&qlist[conf.nqueue]; q++, q++) { if(q->flag == 0){ if(canlock(q)){ if(q->flag == 0) break; unlock(q); } } } if(q == &qlist[conf.nqueue]){ print("no more queues\n"); | |
| 1990/11211 | error(Enoqueue); | |
| 1990/0227 | } q->flag = QINUSE; q->r.p = 0; q->info = qi; q->put = qi->iput; | |
| 1990/0403 | q->len = q->nb = 0; | |
| 1990/1009 | q->ptr = 0; | |
| 1990/1212 | q->rp = &q->r; | |
| 1990/0227 | wq = q->other = q + 1; | |
| 1990/0702 | wq->flag = QINUSE; | |
| 1990/0227 | wq->r.p = 0; wq->info = qi; wq->put = qi->oput; wq->other = q; | |
| 1990/1009 | wq->ptr = 0; | |
| 1990/0403 | wq->len = wq->nb = 0; | |
| 1990/1212 | wq->rp = &wq->r; | |
| 1990/0227 | unlock(q); return q; } /* | |
| 1990/0629 | * flush a queue */ static void flushq(Queue *q) { Block *bp; q = RD(q); while(bp = getq(q)) freeb(bp); q = WR(q); while(bp = getq(q)) freeb(bp); } /* | |
| 1990/0227 | * free a queue */ static void freeq(Queue *q) { Block *bp; q = RD(q); while(bp = getq(q)) freeb(bp); q = WR(q); while(bp = getq(q)) freeb(bp); RD(q)->flag = 0; } /* * push a queue onto a stream referenced by the proc side write q */ Queue * pushq(Stream* s, Qinfo *qi) { Queue *q; Queue *nq; q = RD(s->procq); /* * make the new queue */ nq = allocq(qi); /* * push */ RD(nq)->next = q; RD(WR(q)->next)->next = RD(nq); WR(nq)->next = WR(q)->next; WR(q)->next = WR(nq); if(qi->open) (*qi->open)(RD(nq), s); return WR(nq)->next; } /* * pop off the top line discipline */ static void popq(Stream *s) { Queue *q; if(s->procq->next == WR(s->devq)) | |
| 1990/11211 | error(Ebadld); | |
| 1990/0227 | q = s->procq->next; if(q->info->close) (*q->info->close)(RD(q)); s->procq->next = q->next; RD(q->next)->next = RD(s->procq); freeq(q); } /* * add a block (or list of blocks) to the end of a queue. return true * if one of the blocks contained a delimiter. */ int putq(Queue *q, Block *bp) { int delim; lock(q); if(q->first) q->last->next = bp; else q->first = bp; | |
| 1990/0312 | q->len += BLEN(bp); | |
| 1990/0403 | q->nb++; | |
| 1990/0227 | delim = bp->flags & S_DELIM; while(bp->next) { bp = bp->next; | |
| 1990/0312 | q->len += BLEN(bp); | |
| 1990/0403 | q->nb++; | |
| 1990/0227 | delim |= bp->flags & S_DELIM; } q->last = bp; | |
| 1990/0403 | if(q->len >= Streamhi || q->nb >= Streambhi) | |
| 1990/0227 | q->flag |= QHIWAT; unlock(q); return delim; } | |
| 1991/0316 | ||
| 1990/0227 | int | |
| 1991/0316 | blen(Block *bp) { int len; len = 0; while(bp) { len += BLEN(bp); bp = bp->next; } return len; } /* | |
| 1991/0317 | * bround - round a block chain to some 2^n number of bytes | |
| 1991/0316 | */ int bround(Block *bp, int amount) { Block *last; int len, pad; len = 0; | |
| 1991/0317 | SET(last); /* Ken's magic */ | |
| 1991/0316 | while(bp) { len += BLEN(bp); last = bp; bp = bp->next; } pad = ((len + amount) & ~amount) - len; if(pad) { last->next = allocb(pad); last->flags &= ~S_DELIM; | |
| 1991/0317 | last = last->next; memset(last->wptr, 0, pad); last->wptr += pad; last->flags |= S_DELIM; | |
| 1991/0316 | } return len + pad; } int | |
| 1990/0227 | putb(Blist *q, Block *bp) { int delim; if(q->first) q->last->next = bp; else q->first = bp; | |
| 1990/0312 | q->len += BLEN(bp); | |
| 1990/0227 | delim = bp->flags & S_DELIM; while(bp->next) { bp = bp->next; | |
| 1990/0312 | q->len += BLEN(bp); | |
| 1990/0227 | delim |= bp->flags & S_DELIM; } q->last = bp; return delim; } /* * add a block to the start of a queue */ | |
| 1990/0312 | void | |
| 1990/0227 | putbq(Blist *q, Block *bp) { lock(q); if(q->first) bp->next = q->first; else q->last = bp; q->first = bp; | |
| 1990/0312 | q->len += BLEN(bp); | |
| 1990/0403 | q->nb++; | |
| 1990/0227 | unlock(q); } /* | |
| 1990/0312 | * remove the first block from a queue | |
| 1990/0227 | */ Block * getq(Queue *q) { Block *bp; lock(q); bp = q->first; if(bp) { q->first = bp->next; if(q->first == 0) q->last = 0; | |
| 1990/0312 | q->len -= BLEN(bp); | |
| 1990/0403 | q->nb--; | |
| 1990/0406 | if((q->flag&QHIWAT) && q->len<Streamhi/2 && q->nb<Streambhi/2){ | |
| 1990/1212 | wakeup(q->other->next->other->rp); | |
| 1990/0227 | q->flag &= ~QHIWAT; } bp->next = 0; } unlock(q); return bp; } | |
| 1990/0312 | /* * remove the first block from a list of blocks */ | |
| 1990/0227 | Block * getb(Blist *q) { Block *bp; bp = q->first; if(bp) { q->first = bp->next; if(q->first == 0) q->last = 0; | |
| 1990/0312 | q->len -= BLEN(bp); | |
| 1990/0227 | bp->next = 0; } return bp; } /* | |
| 1990/0907 | * make sure the first block has n bytes */ Block * pullup(Block *bp, int n) { Block *nbp; int i; /* * this should almost always be true, the rest it * just for to avoid every caller checking. */ if(BLEN(bp) >= n) return bp; /* * if not enough room in the first block, * add another to the front of the list. if(bp->lim - bp->rptr < n){ nbp = allocb(n); nbp->next = bp; bp = nbp; } /* * copy bytes from the trailing blocks into the first */ n -= BLEN(bp); while(nbp = bp->next){ i = BLEN(nbp); if(i > n) { | |
| 1991/0318 | memmove(bp->wptr, nbp->rptr, n); | |
| 1990/0907 | bp->wptr += n; nbp->rptr += n; return bp; } else { | |
| 1991/0318 | memmove(bp->wptr, nbp->rptr, i); | |
| 1990/0907 | bp->wptr += i; bp->next = nbp->next; nbp->next = 0; freeb(nbp); } } freeb(bp); return 0; } /* | |
| 1990/0312 | * grow the front of a list of blocks by n bytes */ Block * prepend(Block *bp, int n) { Block *nbp; if(bp->base && (bp->rptr - bp->base)>=n){ /* * room for channel number in first block of message */ bp->rptr -= n; return bp; } else { /* * make new block, put message number at end */ nbp = allocb(2); nbp->next = bp; nbp->wptr = nbp->lim; nbp->rptr = nbp->wptr - n; return nbp; } } | |
| 1991/0316 | ||
| 1990/0312 | /* | |
| 1990/0227 | * put a block into the bit bucket */ void nullput(Queue *q, Block *bp) { | |
| 1990/0629 | if(bp->type == M_HANGUP) freeb(bp); else { freeb(bp); | |
| 1990/11211 | error(Ehungup); | |
| 1990/0629 | } | |
| 1990/0227 | } /* * find the info structure for line discipline 'name' */ static Qinfo * qinfofind(char *name) { | |
| 1990/0911 | Qinfo *qi; | |
| 1990/0227 | if(name == 0) | |
| 1990/11211 | error(Ebadld); | |
| 1990/0911 | for(qi = lds; qi; qi = qi->next) if(strcmp(qi->name, name)==0) return qi; | |
| 1990/11211 | error(Ebadld); | |
| 1990/0227 | } /* * send a hangup up a stream */ static void hangup(Stream *s) { Block *bp; bp = allocb(0); bp->type = M_HANGUP; (*s->devq->put)(s->devq, bp); } /* * parse a string and return a pointer to the second element if the * first matches name. bp->rptr will be updated to point to the * second element. * * return 0 if no match. * * it is assumed that the block data is null terminated. streamwrite * guarantees this. */ int streamparse(char *name, Block *bp) { int len; len = strlen(name); | |
| 1990/0312 | if(BLEN(bp) < len) | |
| 1990/0227 | return 0; if(strncmp(name, (char *)bp->rptr, len)==0){ if(bp->rptr[len] == ' ') bp->rptr += len+1; else if(bp->rptr[len]) return 0; else bp->rptr += len; return 1; } return 0; } /* | |
| 1990/0907 | * the per stream directory structure */ Dirtab streamdir[]={ | |
| 1990/11211 | "data", {Sdataqid}, 0, 0600, "ctl", {Sctlqid}, 0, 0600, | |
| 1990/0907 | }; /* | |
| 1990/0227 | * A stream device consists of the contents of streamdir plus * any directory supplied by the actual device. * * values of s: * 0 to ntab-1 apply to the auxiliary directory. * ntab to ntab+Shighqid-Slowqid+1 apply to streamdir. */ int streamgen(Chan *c, Dirtab *tab, int ntab, int s, Dir *dp) { Proc *p; char buf[NAMELEN]; if(s < ntab) tab = &tab[s]; else if(s < ntab + Shighqid - Slowqid + 1) tab = &streamdir[s - ntab]; else return -1; | |
| 1990/11211 | devdir(c, (Qid){STREAMQID(STREAMID(c->qid.path),tab->qid.path), 0}, tab->name, tab->length, | |
| 1990/0227 | tab->perm, dp); return 1; } /* | |
| 1990/1009 | * create a new stream, if noopen is non-zero, don't increment the open count | |
| 1990/0227 | */ Stream * | |
| 1990/1009 | streamnew(ushort type, ushort dev, ushort id, Qinfo *qi, int noopen) | |
| 1990/0227 | { Stream *s; Queue *q; /* * find a free stream struct */ for(s = slist; s < &slist[conf.nstream]; s++) { if(s->inuse == 0){ | |
| 1990/11161 | if(canqlock(s)){ | |
| 1990/0227 | if(s->inuse == 0) break; | |
| 1990/11161 | qunlock(s); | |
| 1990/0227 | } } } if(s == &slist[conf.nstream]){ print("no more streams\n"); | |
| 1990/11211 | error(Enostream); | |
| 1990/0227 | } if(waserror()){ | |
| 1990/11161 | qunlock(s); | |
| 1990/1009 | streamclose1(s); | |
| 1990/0227 | nexterror(); } /* | |
| 1990/1009 | * identify the stream | |
| 1990/0227 | */ | |
| 1990/1009 | s->type = type; s->dev = dev; s->id = id; | |
| 1990/0227 | /* * hang a device and process q off the stream */ s->inuse = 1; | |
| 1990/1009 | if(noopen) s->opens = 0; else s->opens = 1; | |
| 1990/0331 | s->hread = 0; | |
| 1990/0227 | q = allocq(&procinfo); s->procq = WR(q); q = allocq(qi); s->devq = RD(q); WR(s->procq)->next = WR(s->devq); RD(s->procq)->next = 0; RD(s->devq)->next = RD(s->procq); WR(s->devq)->next = 0; if(qi->open) (*qi->open)(RD(s->devq), s); | |
| 1990/11161 | qunlock(s); | |
| 1990/0227 | poperror(); return s; } /* * (Re)open a stream. If this is the first open, create a stream. */ void streamopen(Chan *c, Qinfo *qi) { Stream *s; Queue *q; /* | |
| 1990/1009 | * if the stream already exists, just increment the reference counts. | |
| 1990/0227 | */ for(s = slist; s < &slist[conf.nstream]; s++) { if(s->inuse && s->type == c->type && s->dev == c->dev | |
| 1990/11211 | && s->id == STREAMID(c->qid.path)){ | |
| 1990/11161 | qlock(s); | |
| 1990/0227 | if(s->inuse && s->type == c->type && s->dev == c->dev | |
| 1990/11211 | && s->id == STREAMID(c->qid.path)){ | |
| 1990/0227 | s->inuse++; | |
| 1990/0629 | s->opens++; | |
| 1990/0227 | c->stream = s; | |
| 1990/11161 | qunlock(s); | |
| 1990/0227 | return; } | |
| 1990/11161 | qunlock(s); | |
| 1990/0227 | } } /* * create a new stream */ | |
| 1990/11211 | c->stream = streamnew(c->type, c->dev, STREAMID(c->qid.path), qi, 0); | |
| 1990/0227 | } /* | |
| 1990/0629 | * Enter a stream. Increment the reference count so it can't disappear * under foot. */ int streamenter(Stream *s) { | |
| 1990/11161 | qlock(s); | |
| 1990/0629 | if(s->opens == 0){ | |
| 1990/11161 | qunlock(s); | |
| 1990/0629 | return -1; } s->inuse++; | |
| 1990/11161 | qunlock(s); | |
| 1990/0629 | return 0; } /* * Decrement the reference count on a stream. If the count is * zero, free the stream. */ | |
| 1990/1101 | int | |
| 1990/0629 | streamexit(Stream *s, int locked) { Queue *q; Queue *nq; | |
| 1990/1101 | int rv; | |
| 1990/1104 | char *name; | |
| 1990/0629 | if(!locked) | |
| 1990/11161 | qlock(s); | |
| 1990/0629 | if(s->inuse == 1){ | |
| 1990/1104 | if(s->opens != 0) | |
| 1990/11151 | panic("streamexit %d %s\n", s->opens, s->devq->info->name); | |
| 1990/1104 | ||
| 1990/0629 | /* * ascend the stream freeing the queues */ for(q = s->devq; q; q = nq){ nq = q->next; freeq(q); } s->id = s->dev = s->type = 0; } s->inuse--; | |
| 1990/1101 | rv = s->inuse; | |
| 1990/0629 | if(!locked) | |
| 1990/11161 | qunlock(s); | |
| 1990/1101 | return rv; | |
| 1990/0629 | } /* | |
| 1990/0227 | * On the last close of a stream, for each queue on the * stream release its blocks and call its close routine. */ | |
| 1990/11211 | int | |
| 1990/1009 | streamclose1(Stream *s) | |
| 1990/0227 | { Queue *q, *nq; Block *bp; | |
| 1990/11211 | int rv; | |
| 1990/0227 | /* | |
| 1990/0629 | * decrement the reference count | |
| 1990/0227 | */ | |
| 1990/11161 | qlock(s); | |
| 1990/0629 | if(s->opens == 1){ | |
| 1990/11161 | /* * descend the stream closing the queues */ for(q = s->procq; q; q = q->next){ if(!waserror()){ | |
| 1990/1011 | if(q->info->close) (*q->info->close)(q->other); | |
| 1990/11161 | poperror(); | |
| 1990/1011 | } | |
| 1990/11161 | WR(q)->put = nullput; /* * this may be 2 streams joined device end to device end */ if(q == s->devq->other) break; | |
| 1990/0629 | } /* * ascend the stream flushing the queues */ for(q = s->devq; q; q = nq){ nq = q->next; flushq(q); } | |
| 1990/0227 | } | |
| 1990/11211 | rv = --(s->opens); | |
| 1990/0227 | /* | |
| 1990/0629 | * leave it and free it | |
| 1990/0227 | */ | |
| 1990/0629 | streamexit(s, 1); | |
| 1990/11161 | qunlock(s); | |
| 1990/11211 | return rv; | |
| 1990/0227 | } | |
| 1990/11211 | int | |
| 1990/1009 | streamclose(Chan *c) { /* * if no stream, ignore it */ if(!c->stream) | |
| 1990/1214 | return 0; | |
| 1990/11211 | return streamclose1(c->stream); | |
| 1990/1009 | } | |
| 1990/0227 | /* * put a block to be read into the queue. wakeup any waiting reader */ void stputq(Queue *q, Block *bp) { | |
| 1990/0321 | int delim; | |
| 1990/0227 | if(bp->type == M_HANGUP){ freeb(bp); q->flag |= QHUNGUP; q->other->flag |= QHUNGUP; | |
| 1990/1212 | wakeup(q->other->rp); | |
| 1990/0321 | delim = 1; | |
| 1990/0227 | } else { lock(q); if(q->first) q->last->next = bp; else q->first = bp; | |
| 1990/0312 | q->len += BLEN(bp); | |
| 1990/0403 | q->nb++; | |
| 1990/0321 | delim = bp->flags & S_DELIM; | |
| 1990/0312 | while(bp->next) { bp = bp->next; q->len += BLEN(bp); | |
| 1990/0403 | q->nb++; | |
| 1990/0321 | delim |= bp->flags & S_DELIM; | |
| 1990/0312 | } | |
| 1990/0227 | q->last = bp; | |
| 1990/0403 | if(q->len >= Streamhi || q->nb >= Streambhi){ | |
| 1990/0227 | q->flag |= QHIWAT; | |
| 1990/0321 | delim = 1; } | |
| 1990/0227 | unlock(q); } | |
| 1990/0321 | if(delim) | |
| 1990/1212 | wakeup(q->rp); | |
| 1990/0227 | } /* * read a string. update the offset accordingly. */ long stringread(Chan *c, uchar *buf, long n, char *str) { long i; i = strlen(str); i -= c->offset; if(i<n) n = i; if(n<0) return 0; | |
| 1991/0318 | memmove(buf, str + c->offset, n); | |
| 1990/0227 | return n; } /* | |
| 1990/0930 | * return the stream id */ long streamctlread(Chan *c, void *vbuf, long n) { uchar *buf = vbuf; char num[32]; Stream *s; s = c->stream; | |
| 1990/11211 | if(STREAMTYPE(c->qid.path) == Sctlqid){ | |
| 1990/0930 | sprint(num, "%d", s->id); return stringread(c, buf, n, num); } else { | |
| 1990/11211 | if(CHDIR & c->qid.path) | |
| 1990/0930 | return devdirread(c, vbuf, n, 0, 0, streamgen); else panic("streamctlread"); } } /* | |
| 1990/0227 | * return true if there is an output buffer available */ static int isinput(void *x) { | |
| 1990/0930 | Queue *q; q = (Queue *)x; return (q->flag&QHUNGUP) || q->first!=0; | |
| 1990/0227 | } /* * read until we fill the buffer or until a DELIM is encountered */ long streamread(Chan *c, void *vbuf, long n) { Block *bp; Stream *s; Queue *q; | |
| 1990/0930 | int left, i; | |
| 1990/0227 | uchar *buf = vbuf; | |
| 1990/11211 | if(STREAMTYPE(c->qid.path) != Sdataqid) | |
| 1990/0930 | return streamctlread(c, vbuf, n); | |
| 1990/0227 | /* * one reader at a time */ | |
| 1990/0930 | s = c->stream; | |
| 1990/0227 | qlock(&s->rdlock); if(waserror()){ qunlock(&s->rdlock); nexterror(); } /* * sleep till data is available */ q = RD(s->procq); left = n; while(left){ bp = getq(q); if(bp == 0){ | |
| 1990/0331 | if(q->flag & QHUNGUP){ if(s->hread++ < 3) break; else | |
| 1990/11211 | error(Ehungup); | |
| 1990/0331 | } | |
| 1990/1212 | q->rp = &q->r; sleep(q->rp, &isinput, (void *)q); | |
| 1990/0227 | continue; } | |
| 1990/0312 | i = BLEN(bp); | |
| 1990/0227 | if(i <= left){ | |
| 1991/0318 | memmove(buf, bp->rptr, i); | |
| 1990/0227 | left -= i; buf += i; if(bp->flags & S_DELIM){ freeb(bp); break; } else freeb(bp); } else { | |
| 1991/0318 | memmove(buf, bp->rptr, left); | |
| 1990/0227 | bp->rptr += left; putbq(q, bp); left = 0; } }; qunlock(&s->rdlock); poperror(); return n - left; } /* | |
| 1990/1202 | * look for an instance of the line discipline `name' on * the stream `s' */ void qlook(Stream *s, char *name) { Queue *q; for(q = s->procq; q; q = q->next){ if(strcmp(q->info->name, name) == 0) return; /* * this may be 2 streams joined device end to device end */ if(q == s->devq->other) break; } errors("not found"); } /* | |
| 1990/0227 | * Handle a ctl request. Streamwide requests are: * * hangup -- send an M_HANGUP up the stream * push ldname -- push the line discipline named ldname * pop -- pop a line discipline | |
| 1990/1202 | * look ldname -- look for a line discipline | |
| 1990/0227 | * * This routing is entrered with s->wrlock'ed and must unlock. */ static long | |
| 1990/0930 | streamctlwrite(Chan *c, void *a, long n) | |
| 1990/0227 | { Qinfo *qi; Block *bp; | |
| 1990/0930 | Stream *s; | |
| 1990/0227 | ||
| 1990/11211 | if(STREAMTYPE(c->qid.path) != Sctlqid) | |
| 1990/0930 | panic("streamctlwrite %lux", c->qid); s = c->stream; | |
| 1990/0227 | /* * package */ bp = allocb(n+1); | |
| 1991/0318 | memmove(bp->wptr, a, n); | |
| 1990/0227 | bp->wptr[n] = 0; bp->wptr += n + 1; /* * check for standard requests */ if(streamparse("hangup", bp)){ hangup(s); freeb(bp); } else if(streamparse("push", bp)){ qi = qinfofind((char *)bp->rptr); pushq(s, qi); freeb(bp); } else if(streamparse("pop", bp)){ popq(s); | |
| 1990/1202 | freeb(bp); } else if(streamparse("look", bp)){ qlook(s, (char *)bp->rptr); | |
| 1990/0227 | freeb(bp); } else { bp->type = M_CTL; bp->flags |= S_DELIM; PUTNEXT(s->procq, bp); } return n; } /* * wait till there's room in the next stream */ static int notfull(void *arg) { | |
| 1990/0406 | return !QFULL((Queue *)arg); | |
| 1990/0227 | } void flowctl(Queue *q) { | |
| 1990/1018 | qlock(&q->rlock); | |
| 1990/1113 | if(waserror()){ qunlock(&q->rlock); nexterror(); } | |
| 1990/1212 | q->rp = &q->r; sleep(q->rp, notfull, q->next); | |
| 1990/1018 | qunlock(&q->rlock); | |
| 1990/1113 | poperror(); | |
| 1990/0227 | } /* * send the request as a single delimited block */ long | |
| 1990/0312 | streamwrite(Chan *c, void *a, long n, int docopy) | |
| 1990/0227 | { Stream *s; Block *bp; Queue *q; long rem; int i; | |
| 1990/0911 | s = c->stream; | |
| 1990/0227 | /* * decode the qid */ | |
| 1990/11211 | if(STREAMTYPE(c->qid.path) != Sdataqid) | |
| 1990/0930 | return streamctlwrite(c, a, n); | |
| 1990/0227 | /* * No writes allowed on hungup channels */ q = s->procq; if(q->other->flag & QHUNGUP) | |
| 1990/11211 | error(Ehungup); | |
| 1990/0227 | ||
| 1990/0930 | if(!docopy && GLOBAL(a)){ | |
| 1990/0227 | /* * `a' is global to the whole system, just create a * pointer to it and pass it on. */ | |
| 1990/0403 | FLOWCTL(q); | |
| 1990/0227 | bp = allocb(0); bp->rptr = bp->base = (uchar *)a; bp->wptr = bp->lim = (uchar *)a+n; bp->flags |= S_DELIM; bp->type = M_DATA; PUTNEXT(q, bp); } else { /* * `a' is in the user's address space, copy it into * system buffers and pass the buffers on. */ for(rem = n; ; rem -= i) { | |
| 1990/0403 | FLOWCTL(q); | |
| 1990/0227 | bp = allocb(rem); i = bp->lim - bp->wptr; if(i >= rem){ | |
| 1991/0318 | memmove(bp->wptr, a, rem); | |
| 1990/0227 | bp->flags |= S_DELIM; bp->wptr += rem; bp->type = M_DATA; PUTNEXT(q, bp); break; } else { | |
| 1991/0318 | memmove(bp->wptr, a, i); | |
| 1990/0227 | bp->wptr += i; bp->type = M_DATA; PUTNEXT(q, bp); a = ((char*)a) + i; } } } return n; | |
| 1990/0312 | } /* * like andrew's getmfields but no hidden state */ int getfields(char *lp, /* to be parsed */ char **fields, /* where to put pointers */ int n, /* number of pointers */ char sep /* separator */ ) { int i; for(i=0; lp && *lp && i<n; i++){ while(*lp == sep) *lp++=0; if(*lp == 0) break; fields[i]=lp; while(*lp && *lp != sep) lp++; } return i; | |
| 1990/0801 | } /* * stat a stream. the length is the number of bytes up to the * first delimiter. */ void streamstat(Chan *c, char *db, char *name) { Dir dir; Stream *s; Queue *q; Block *bp; long n; s = c->stream; if(s == 0) | |
| 1990/0914 | n = 0; else { q = RD(s->procq); lock(q); for(n=0, bp=q->first; bp; bp = bp->next){ n += BLEN(bp); if(bp->flags&S_DELIM) break; } unlock(q); | |
| 1990/0801 | } devdir(c, c->qid, name, n, 0, &dir); convD2M(&dir, db); | |
| 1990/0911 | } /* * Dump all block information of how many blocks are in which queues */ void dumpblocks(Queue *q, char c) { Block *bp; uchar *cp; lock(q); for(bp = q->first; bp; bp = bp->next){ | |
| 1990/1009 | print("%c%d%c", c, bp->wptr-bp->rptr, (bp->flags&S_DELIM)?'D':' '); | |
| 1990/0911 | for(cp = bp->rptr; cp<bp->wptr && cp<bp->rptr+10; cp++) print(" %uo", *cp); print("\n"); } unlock(q); } void dumpqueues(void) { Queue *q; | |
| 1991/0317 | int count, qcount; | |
| 1990/0911 | Block *bp; Bclass *bcp; print("\n"); | |
| 1991/0317 | qcount = 0; | |
| 1990/0911 | for(q = qlist; q < qlist + conf.nqueue; q++, q++){ if(!(q->flag & QINUSE)) continue; | |
| 1991/0317 | qcount++; | |
| 1991/0323 | print("%10s %ux R n %d l %d f %ux r %ux", q->info->name, q, q->nb, q->len, q->flag, &(q->r)); | |
| 1991/0320 | print(" W n %d l %d f %ux r %ux next %lux put %lux Rz %lux\n", WR(q)->nb, WR(q)->len, WR(q)->flag, &(WR(q)->r), q->next, q->put, q->rp); | |
| 1990/0911 | dumpblocks(q, 'R'); dumpblocks(WR(q), 'W'); } | |
| 1991/0317 | print("%d queues\n", qcount); | |
| 1990/0911 | for(bcp=bclass; bcp<&bclass[Nclass]; bcp++){ lock(bcp); for(count = 0, bp = bcp->first; bp; count++, bp = bp->next) ; unlock(bcp); print("%d blocks of size %d\n", count, bcp->size); } print("\n"); | |
| 1990/0227 | } | |