| plan 9 kernel history: overview | file list | diff list |
1992/0625/port/stream.c (diff list | history)
| port/stream.c on 1990/0227 | ||
| 1990/0227 | #include "u.h" | |
| 1992/0321 | #include "../port/lib.h" | |
| 1990/0227 | #include "mem.h" #include "dat.h" #include "fns.h" #include "io.h" | |
| 1992/0111 | #include "../port/error.h" | |
| 1990/0227 | #include "devtab.h" | |
| 1990/0312 | /* | |
| 1992/0623 | * Part 1) Blocks | |
| 1990/0312 | */ | |
| 1990/0227 | ||
| 1990/0312 | /* | |
| 1992/0623 | * Allocate a block. Put the data portion at the end of the smalloc'd * chunk so that it can easily grow from the front to add protocol * headers. Thank Larry Peterson for the suggestion. | |
| 1990/0312 | */ | |
| 1990/0227 | Block * allocb(ulong size) { Block *bp; | |
| 1992/0623 | uchar *base, *lim; | |
| 1990/0227 | ||
| 1992/0619 | bp = smalloc(sizeof(Block)+size); | |
| 1990/0227 | ||
| 1992/0623 | base = (uchar*)bp + sizeof(Block); lim = (uchar*)bp + msize(bp); bp->wptr = bp->rptr = lim - size; bp->base = base; bp->lim = lim; | |
| 1992/0619 | bp->flags = 0; | |
| 1990/0227 | bp->next = 0; | |
| 1991/1122 | bp->list = 0; | |
| 1990/0227 | bp->type = M_DATA; return bp; } /* | |
| 1990/0312 | * Free a block (or list of blocks). Poison its pointers so that | |
| 1992/0623 | * someone trying to access it after freeing will cause a panic. | |
| 1990/0227 | */ void freeb(Block *bp) { | |
| 1992/0619 | Block *next; | |
| 1990/0227 | ||
| 1992/0619 | while(bp){ bp->rptr = 0; bp->wptr = 0; next = bp->next; free(bp); bp = next; | |
| 1990/0312 | } | |
| 1990/0227 | } /* | |
| 1992/0623 | * Pad a block to the front with n bytes. This is used to add protocol * headers to the front of blocks. | |
| 1990/0911 | */ Block * padb(Block *bp, int n) { Block *nbp; if(bp->base && bp->rptr-bp->base>=n){ bp->rptr -= n; return bp; } else { nbp = allocb(n); nbp->wptr = nbp->lim; nbp->rptr = nbp->wptr - n; nbp->next = bp; return nbp; } } /* | |
| 1992/0623 | * make sure the first block has n bytes */ Block * pullup(Block *bp, int n) { Block *nbp; int i; /* * this should almost always be true, the rest it * just for to avoid every caller checking. */ if(BLEN(bp) >= n) return bp; /* * if not enough room in the first block, * add another to the front of the list. */ if(bp->lim - bp->rptr < n){ nbp = allocb(n); nbp->next = bp; bp = nbp; } /* * copy bytes from the trailing blocks into the first */ n -= BLEN(bp); while(nbp = bp->next){ i = BLEN(nbp); if(i >= n) { memmove(bp->wptr, nbp->rptr, n); bp->wptr += n; nbp->rptr += n; return bp; } else { memmove(bp->wptr, nbp->rptr, i); bp->wptr += i; bp->next = nbp->next; nbp->next = 0; freeb(nbp); n -= i; } } freeb(bp); return 0; } /* * return the number of data bytes of a list of blocks */ int blen(Block *bp) { int len; len = 0; while(bp) { len += BLEN(bp); bp = bp->next; } return len; } /* * round a block chain to some even number of bytes. Used * by devip.c becuase all IP packets must have an even number * of bytes. * * The last block in the returned chain will have S_DELIM set. */ int bround(Block *bp, int amount) { Block *last; int len, pad; len = 0; SET(last); /* Ken's magic */ while(bp) { len += BLEN(bp); last = bp; bp = bp->next; } pad = ((len + amount) & ~amount) - len; if(pad) { if(last->lim - last->wptr >= pad){ memset(last->wptr, 0, pad); last->wptr += pad; } else { last->next = allocb(pad); last->flags &= ~S_DELIM; last = last->next; last->wptr += pad; last->flags |= S_DELIM; } } return len + pad; } /* * expand a block list to be one block, len bytes long. used by * ethernet routines. */ Block* expandb(Block *bp, int len) { Block *nbp, *new; int i; ulong delim = 0; new = allocb(len); if(new == 0){ freeb(bp); return 0; } /* * copy bytes into new block */ for(nbp = bp; len>0 && nbp; nbp = nbp->next){ delim = nbp->flags & S_DELIM; i = BLEN(nbp); if(i > len) { memmove(new->wptr, nbp->rptr, len); new->wptr += len; break; } else { memmove(new->wptr, nbp->rptr, i); new->wptr += i; len -= i; } } if(len){ memset(new->wptr, 0, len); new->wptr += len; } new->flags |= delim; freeb(bp); return new; } /* * make a copy of the first 'count' bytes of a block chain. Use * by transport protocols. */ Block * copyb(Block *bp, int count) { Block *nb, *head, **p; int l; p = &head; while(count) { l = BLEN(bp); if(count < l) l = count; nb = allocb(l); if(nb == 0) panic("copyb.1"); memmove(nb->wptr, bp->rptr, l); nb->wptr += l; count -= l; if(bp->flags & S_DELIM) nb->flags |= S_DELIM; *p = nb; p = &nb->next; bp = bp->next; if(bp == 0) break; } if(count) { nb = allocb(count); if(nb == 0) panic("copyb.2"); memset(nb->wptr, 0, count); nb->wptr += count; nb->flags |= S_DELIM; *p = nb; } if(blen(head) == 0) print("copyb: zero length\n"); return head; } /* * Part 2) Queues */ /* * process end line discipline */ | |
| 1992/0625 | static Streamput stputq; | |
| 1992/0623 | Qinfo procinfo = { stputq, nullput, 0, 0, "process" }; /* * line disciplines that can be pushed */ static Qinfo *lds; /* * make known a stream module and call its initialization routine, if * it has one. */ void newqinfo(Qinfo *qi) { if(qi->next) panic("newqinfo: already configured"); qi->next = lds; lds = qi; if(qi->reset) (*qi->reset)(); } /* * find the info structure for line discipline 'name' */ Qinfo * qinfofind(char *name) { Qinfo *qi; if(name == 0) return 0; for(qi = lds; qi; qi = qi->next) if(strcmp(qi->name, name)==0) return qi; return 0; } /* | |
| 1990/0227 | * allocate a pair of queues. flavor them with the requested put routines. * the `QINUSE' flag on the read side is the only one used. */ static Queue * allocq(Qinfo *qi) { Queue *q, *wq; | |
| 1992/0623 | q = smalloc(2*sizeof(Queue)); | |
| 1990/0227 | q->flag = QINUSE; q->r.p = 0; q->info = qi; q->put = qi->iput; | |
| 1990/0403 | q->len = q->nb = 0; | |
| 1990/1009 | q->ptr = 0; | |
| 1990/1212 | q->rp = &q->r; | |
| 1990/0227 | wq = q->other = q + 1; | |
| 1990/0702 | wq->flag = QINUSE; | |
| 1990/0227 | wq->r.p = 0; wq->info = qi; wq->put = qi->oput; wq->other = q; | |
| 1990/1009 | wq->ptr = 0; | |
| 1990/0403 | wq->len = wq->nb = 0; | |
| 1990/1212 | wq->rp = &wq->r; | |
| 1990/0227 | return q; } /* | |
| 1992/0623 | * free a queue | |
| 1990/0629 | */ static void | |
| 1992/0623 | freeq(Queue *q) | |
| 1990/0629 | { Block *bp; q = RD(q); while(bp = getq(q)) freeb(bp); q = WR(q); while(bp = getq(q)) freeb(bp); | |
| 1992/0623 | free(RD(q)); | |
| 1990/0629 | } /* | |
| 1992/0623 | * flush a queue | |
| 1990/0227 | */ static void | |
| 1992/0623 | flushq(Queue *q) | |
| 1990/0227 | { Block *bp; q = RD(q); while(bp = getq(q)) freeb(bp); q = WR(q); while(bp = getq(q)) freeb(bp); } /* * push a queue onto a stream referenced by the proc side write q */ Queue * pushq(Stream* s, Qinfo *qi) { Queue *q; Queue *nq; q = RD(s->procq); /* * make the new queue */ nq = allocq(qi); /* * push */ | |
| 1991/0413 | qlock(s); | |
| 1990/0227 | RD(nq)->next = q; RD(WR(q)->next)->next = RD(nq); WR(nq)->next = WR(q)->next; WR(q)->next = WR(nq); | |
| 1991/0413 | qunlock(s); | |
| 1990/0227 | if(qi->open) (*qi->open)(RD(nq), s); return WR(nq)->next; } /* * pop off the top line discipline */ static void popq(Stream *s) { Queue *q; | |
| 1991/0413 | if(waserror()){ qunlock(s); nexterror(); } qlock(s); | |
| 1990/0227 | if(s->procq->next == WR(s->devq)) | |
| 1990/11211 | error(Ebadld); | |
| 1990/0227 | q = s->procq->next; if(q->info->close) (*q->info->close)(RD(q)); s->procq->next = q->next; RD(q->next)->next = RD(s->procq); | |
| 1991/0413 | qunlock(s); | |
| 1990/0227 | freeq(q); } /* * add a block (or list of blocks) to the end of a queue. return true * if one of the blocks contained a delimiter. */ int putq(Queue *q, Block *bp) { int delim; lock(q); if(q->first) q->last->next = bp; else q->first = bp; | |
| 1990/0312 | q->len += BLEN(bp); | |
| 1990/0403 | q->nb++; | |
| 1990/0227 | delim = bp->flags & S_DELIM; while(bp->next) { bp = bp->next; | |
| 1990/0312 | q->len += BLEN(bp); | |
| 1990/0403 | q->nb++; | |
| 1990/0227 | delim |= bp->flags & S_DELIM; } q->last = bp; | |
| 1990/0403 | if(q->len >= Streamhi || q->nb >= Streambhi) | |
| 1990/0227 | q->flag |= QHIWAT; unlock(q); return delim; } | |
| 1991/0316 | ||
| 1990/0227 | int putb(Blist *q, Block *bp) { int delim; if(q->first) q->last->next = bp; else q->first = bp; | |
| 1990/0312 | q->len += BLEN(bp); | |
| 1990/0227 | delim = bp->flags & S_DELIM; while(bp->next) { bp = bp->next; | |
| 1990/0312 | q->len += BLEN(bp); | |
| 1990/0227 | delim |= bp->flags & S_DELIM; } q->last = bp; return delim; } /* * add a block to the start of a queue */ | |
| 1990/0312 | void | |
| 1990/0227 | putbq(Blist *q, Block *bp) { lock(q); if(q->first) bp->next = q->first; else q->last = bp; q->first = bp; | |
| 1990/0312 | q->len += BLEN(bp); | |
| 1990/0403 | q->nb++; | |
| 1990/0227 | unlock(q); } /* | |
| 1990/0312 | * remove the first block from a queue | |
| 1990/0227 | */ Block * getq(Queue *q) { Block *bp; lock(q); bp = q->first; if(bp) { q->first = bp->next; if(q->first == 0) q->last = 0; | |
| 1990/0312 | q->len -= BLEN(bp); | |
| 1990/0403 | q->nb--; | |
| 1991/1126 | if((q->flag&QHIWAT) && q->len<Streamhi/2 && q->nb<Streambhi/2 &&q->other){ | |
| 1990/1212 | wakeup(q->other->next->other->rp); | |
| 1990/0227 | q->flag &= ~QHIWAT; } bp->next = 0; } unlock(q); return bp; } | |
| 1990/0312 | /* | |
| 1992/0609 | * grab all the blocks in a queue */ Block * grabq(Queue *q) { Block *bp; lock(q); bp = q->first; if(bp){ q->first = 0; q->last = 0; q->len = 0; q->nb = 0; if(q->flag&QHIWAT){ wakeup(q->other->next->other->rp); q->flag &= ~QHIWAT; } } unlock(q); return bp; } /* | |
| 1990/0312 | * remove the first block from a list of blocks */ | |
| 1990/0227 | Block * getb(Blist *q) { Block *bp; bp = q->first; if(bp) { q->first = bp->next; if(q->first == 0) q->last = 0; | |
| 1990/0312 | q->len -= BLEN(bp); | |
| 1990/0227 | bp->next = 0; } return bp; } | |
| 1990/0907 | /* | |
| 1990/0227 | * put a block into the bit bucket */ void nullput(Queue *q, Block *bp) { | |
| 1991/1115 | USED(q); | |
| 1990/0629 | if(bp->type == M_HANGUP) freeb(bp); else { freeb(bp); | |
| 1990/11211 | error(Ehungup); | |
| 1990/0629 | } | |
| 1990/0227 | } /* | |
| 1992/0623 | * Part 3) Streams | |
| 1990/0227 | */ /* | |
| 1992/0623 | * the per stream directory structure | |
| 1990/0227 | */ | |
| 1992/0623 | Dirtab streamdir[]={ "data", {Sdataqid}, 0, 0600, "ctl", {Sctlqid}, 0, 0600, }; | |
| 1990/0227 | /* | |
| 1992/0623 | * hash buckets containing all streams | |
| 1990/0227 | */ | |
| 1992/0623 | enum | |
| 1990/0227 | { | |
| 1992/0623 | Nbits= 5, Nhash= 1<<Nbits, Nmask= Nhash-1, }; typedef struct Sthash Sthash; struct Sthash { QLock; Stream *s; }; static Sthash ht[Nhash]; | |
| 1990/0227 | ||
| 1992/0623 | static void hangup(Stream*); | |
| 1990/0227 | /* * A stream device consists of the contents of streamdir plus * any directory supplied by the actual device. * * values of s: * 0 to ntab-1 apply to the auxiliary directory. * ntab to ntab+Shighqid-Slowqid+1 apply to streamdir. */ int streamgen(Chan *c, Dirtab *tab, int ntab, int s, Dir *dp) { Proc *p; char buf[NAMELEN]; if(s < ntab) tab = &tab[s]; else if(s < ntab + Shighqid - Slowqid + 1) tab = &streamdir[s - ntab]; else return -1; | |
| 1991/1109 | devdir(c, (Qid){STREAMQID(STREAMID(c->qid.path),tab->qid.path), 0}, tab->name, tab->length, eve, tab->perm, dp); | |
| 1990/0227 | return 1; } /* | |
| 1992/0623 | * return a hash bucket for a stream */ static Sthash* hash(int type, int dev, int id) { return &ht[(type*7*7 + dev*7 + id) & Nmask]; } /* | |
| 1990/1009 | * create a new stream, if noopen is non-zero, don't increment the open count | |
| 1990/0227 | */ Stream * | |
| 1990/1009 | streamnew(ushort type, ushort dev, ushort id, Qinfo *qi, int noopen) | |
| 1990/0227 | { Stream *s; Queue *q; | |
| 1992/0623 | Sthash *hb; | |
| 1990/0227 | ||
| 1992/0623 | hb = hash(type, dev, id); | |
| 1990/0227 | /* | |
| 1992/0623 | * if the stream already exists, just increment the reference counts. | |
| 1990/0227 | */ | |
| 1992/0623 | qlock(hb); for(s = hb->s; s; s = s->next) { if(s->type == type && s->dev == dev && s->id == id){ s->inuse++; qunlock(hb); if(noopen == 0){ qlock(s); s->opens++; | |
| 1990/11161 | qunlock(s); | |
| 1990/0227 | } | |
| 1992/0623 | return s; | |
| 1990/0227 | } } /* | |
| 1992/0623 | * create and init a new stream | |
| 1990/0227 | */ | |
| 1992/0623 | s = smalloc(sizeof(Stream)); s->inuse = 1; | |
| 1990/1009 | s->type = type; s->dev = dev; s->id = id; | |
| 1991/0411 | s->err = 0; | |
| 1992/0623 | s->hread = 0; s->next = hb->s; hb->s = s; | |
| 1990/0227 | /* | |
| 1992/0623 | * The ordering of these 2 instructions is very important. * It makes sure we finish the stream initialization before * anyone else can access it. */ qlock(s); qunlock(hb); if(waserror()){ qunlock(s); streamclose1(s); nexterror(); } /* | |
| 1990/0227 | * hang a device and process q off the stream */ | |
| 1990/1009 | if(noopen) s->opens = 0; else s->opens = 1; | |
| 1990/0227 | q = allocq(&procinfo); | |
| 1991/0411 | WR(q)->ptr = s; RD(q)->ptr = s; | |
| 1990/0227 | s->procq = WR(q); q = allocq(qi); s->devq = RD(q); WR(s->procq)->next = WR(s->devq); RD(s->procq)->next = 0; RD(s->devq)->next = RD(s->procq); WR(s->devq)->next = 0; if(qi->open) (*qi->open)(RD(s->devq), s); | |
| 1990/11161 | qunlock(s); | |
| 1990/0227 | poperror(); return s; } /* | |
| 1992/0623 | * Associate a stream with a channel | |
| 1990/0227 | */ void streamopen(Chan *c, Qinfo *qi) { | |
| 1990/11211 | c->stream = streamnew(c->type, c->dev, STREAMID(c->qid.path), qi, 0); | |
| 1990/0227 | } /* | |
| 1992/0623 | * Enter a stream only if the stream exists and is open. Increment the * reference count so it can't disappear under foot. * * Return -1 if the stream no longer exists or is not opened. | |
| 1990/0629 | */ int streamenter(Stream *s) { | |
| 1992/0623 | Sthash *hb; Stream *ns; hb = hash(s->type, s->dev, s->id); qlock(hb); for(ns = hb->s; ns; ns = ns->next) if(s->type == ns->type && s->dev == ns->dev && s->id == ns->id){ s->inuse++; qunlock(hb); if(s->opens == 0){ streamexit(s, 1); return -1; } return 0; } qunlock(hb); return -1; | |
| 1990/0629 | } /* * Decrement the reference count on a stream. If the count is * zero, free the stream. */ | |
| 1992/0623 | void | |
| 1990/0629 | streamexit(Stream *s, int locked) { Queue *q; Queue *nq; | |
| 1990/1104 | char *name; | |
| 1992/0623 | Sthash *hb; Stream **l, *ns; | |
| 1990/0629 | ||
| 1992/0623 | hb = hash(s->type, s->dev, s->id); qlock(hb); if(s->inuse-- == 1){ | |
| 1990/1104 | if(s->opens != 0) | |
| 1990/11151 | panic("streamexit %d %s\n", s->opens, s->devq->info->name); | |
| 1990/1104 | ||
| 1990/0629 | /* * ascend the stream freeing the queues */ for(q = s->devq; q; q = nq){ nq = q->next; freeq(q); } | |
| 1991/0411 | if(s->err) freeb(s->err); | |
| 1992/0623 | /* * unchain it from the hash bucket and free */ l = &hb->s; for(ns = hb->s; ns; ns = ns->next){ if(s == ns){ *l = s->next; break; } l = &ns->next; } free(s); | |
| 1990/0629 | } | |
| 1992/0623 | qunlock(hb); | |
| 1990/0629 | } /* | |
| 1992/0625 | * nail down a stream so that it can't be closed */ void naildownstream(Stream *s) { s->opens++; s->inuse++; } /* | |
| 1992/0623 | * Decrement the open count. When it goes to zero, call the close * routines for each queue in the stream. | |
| 1990/0227 | */ | |
| 1990/11211 | int | |
| 1990/1009 | streamclose1(Stream *s) | |
| 1990/0227 | { Queue *q, *nq; Block *bp; | |
| 1990/11211 | int rv; | |
| 1990/0227 | /* | |
| 1992/0623 | * decrement the open count | |
| 1990/0227 | */ | |
| 1990/11161 | qlock(s); | |
| 1992/0623 | if(s->opens-- == 1){ | |
| 1990/11161 | /* * descend the stream closing the queues */ for(q = s->procq; q; q = q->next){ | |
| 1991/1227 | if(!waserror()){ | |
| 1990/1011 | if(q->info->close) (*q->info->close)(q->other); | |
| 1990/11161 | poperror(); | |
| 1990/1011 | } | |
| 1990/11161 | WR(q)->put = nullput; /* * this may be 2 streams joined device end to device end */ if(q == s->devq->other) break; | |
| 1990/0629 | } /* * ascend the stream flushing the queues */ for(q = s->devq; q; q = nq){ nq = q->next; flushq(q); } | |
| 1990/0227 | } | |
| 1992/0623 | rv = s->opens; qunlock(s); | |
| 1990/0227 | /* | |
| 1990/0629 | * leave it and free it | |
| 1990/0227 | */ | |
| 1990/0629 | streamexit(s, 1); | |
| 1990/11211 | return rv; | |
| 1990/0227 | } | |
| 1990/11211 | int | |
| 1990/1009 | streamclose(Chan *c) { /* * if no stream, ignore it */ if(!c->stream) | |
| 1990/1214 | return 0; | |
| 1990/11211 | return streamclose1(c->stream); | |
| 1990/1009 | } | |
| 1990/0227 | /* * put a block to be read into the queue. wakeup any waiting reader */ void stputq(Queue *q, Block *bp) { | |
| 1992/0625 | int awaken; | |
| 1991/0411 | Stream *s; | |
| 1990/0227 | if(bp->type == M_HANGUP){ | |
| 1991/0411 | s = q->ptr; if(bp->rptr<bp->wptr && s->err==0) s->err = bp; else freeb(bp); | |
| 1990/0227 | q->flag |= QHUNGUP; q->other->flag |= QHUNGUP; | |
| 1990/1212 | wakeup(q->other->rp); | |
| 1992/0625 | awaken = 1; | |
| 1990/0227 | } else { lock(q); if(q->first) q->last->next = bp; else q->first = bp; | |
| 1990/0312 | q->len += BLEN(bp); | |
| 1990/0403 | q->nb++; | |
| 1992/0625 | awaken = bp->flags & S_DELIM; | |
| 1990/0312 | while(bp->next) { bp = bp->next; q->len += BLEN(bp); | |
| 1990/0403 | q->nb++; | |
| 1992/0625 | awaken |= bp->flags & S_DELIM; | |
| 1990/0312 | } | |
| 1990/0227 | q->last = bp; | |
| 1990/0403 | if(q->len >= Streamhi || q->nb >= Streambhi){ | |
| 1990/0227 | q->flag |= QHIWAT; | |
| 1992/0625 | awaken = 1; | |
| 1990/0321 | } | |
| 1990/0227 | unlock(q); } | |
| 1992/0625 | if(awaken) | |
| 1990/1212 | wakeup(q->rp); | |
| 1990/0227 | } /* | |
| 1990/0930 | * return the stream id */ long streamctlread(Chan *c, void *vbuf, long n) { | |
| 1992/0623 | char *buf = vbuf; | |
| 1990/0930 | char num[32]; Stream *s; s = c->stream; | |
| 1990/11211 | if(STREAMTYPE(c->qid.path) == Sctlqid){ | |
| 1990/0930 | sprint(num, "%d", s->id); | |
| 1992/0623 | return readstr(c->offset, buf, n, num); | |
| 1990/0930 | } else { | |
| 1990/11211 | if(CHDIR & c->qid.path) | |
| 1990/0930 | return devdirread(c, vbuf, n, 0, 0, streamgen); else panic("streamctlread"); } | |
| 1992/0520 | return 0; /* not reached */ | |
| 1990/0930 | } /* | |
| 1990/0227 | * return true if there is an output buffer available */ static int isinput(void *x) { | |
| 1990/0930 | Queue *q; q = (Queue *)x; return (q->flag&QHUNGUP) || q->first!=0; | |
| 1990/0227 | } /* * read until we fill the buffer or until a DELIM is encountered */ long streamread(Chan *c, void *vbuf, long n) { Block *bp; | |
| 1991/0811 | Block *tofree; | |
| 1990/0227 | Stream *s; Queue *q; | |
| 1990/0930 | int left, i; | |
| 1990/0227 | uchar *buf = vbuf; | |
| 1990/11211 | if(STREAMTYPE(c->qid.path) != Sdataqid) | |
| 1990/0930 | return streamctlread(c, vbuf, n); | |
| 1990/0227 | /* * one reader at a time */ | |
| 1990/0930 | s = c->stream; | |
| 1991/0809 | left = n; | |
| 1990/0227 | qlock(&s->rdlock); | |
| 1991/0811 | tofree = 0; | |
| 1991/0904 | q = 0; | |
| 1990/0227 | if(waserror()){ | |
| 1991/0809 | /* | |
| 1991/0811 | * put any partially read message back into the * queue | |
| 1991/0809 | */ | |
| 1991/0811 | while(tofree){ bp = tofree; tofree = bp->next; bp->next = 0; putbq(q, bp); } | |
| 1990/0227 | qunlock(&s->rdlock); nexterror(); } /* * sleep till data is available */ q = RD(s->procq); while(left){ bp = getq(q); if(bp == 0){ | |
| 1990/0331 | if(q->flag & QHUNGUP){ | |
| 1991/0411 | if(s->err) | |
| 1992/0114 | error((char*)s->err->rptr); | |
| 1991/0411 | else if(s->hread++<3) | |
| 1990/0331 | break; else | |
| 1990/11211 | error(Ehungup); | |
| 1990/0331 | } | |
| 1990/1212 | q->rp = &q->r; | |
| 1992/0207 | sleep(q->rp, isinput, (void *)q); | |
| 1991/0809 | continue; } | |
| 1990/0312 | i = BLEN(bp); | |
| 1990/0227 | if(i <= left){ | |
| 1991/0318 | memmove(buf, bp->rptr, i); | |
| 1990/0227 | left -= i; buf += i; | |
| 1991/0811 | bp->next = tofree; tofree = bp; if(bp->flags & S_DELIM) | |
| 1990/0227 | break; } else { | |
| 1991/0318 | memmove(buf, bp->rptr, left); | |
| 1990/0227 | bp->rptr += left; putbq(q, bp); left = 0; } | |
| 1991/0501 | } | |
| 1991/0811 | /* * free completely read blocks */ if(tofree) freeb(tofree); | |
| 1990/0227 | qunlock(&s->rdlock); poperror(); return n - left; } /* | |
| 1990/1202 | * look for an instance of the line discipline `name' on * the stream `s' */ void qlook(Stream *s, char *name) { Queue *q; for(q = s->procq; q; q = q->next){ if(strcmp(q->info->name, name) == 0) return; /* * this may be 2 streams joined device end to device end */ if(q == s->devq->other) break; } | |
| 1992/0114 | error(Ebadarg); | |
| 1990/1202 | } /* | |
| 1990/0227 | * Handle a ctl request. Streamwide requests are: * * hangup -- send an M_HANGUP up the stream * push ldname -- push the line discipline named ldname * pop -- pop a line discipline | |
| 1990/1202 | * look ldname -- look for a line discipline | |
| 1990/0227 | * | |
| 1991/1105 | * This routing is entered with s->wrlock'ed and must unlock. | |
| 1990/0227 | */ static long | |
| 1990/0930 | streamctlwrite(Chan *c, void *a, long n) | |
| 1990/0227 | { Qinfo *qi; Block *bp; | |
| 1990/0930 | Stream *s; | |
| 1990/0227 | ||
| 1990/11211 | if(STREAMTYPE(c->qid.path) != Sctlqid) | |
| 1990/0930 | panic("streamctlwrite %lux", c->qid); s = c->stream; | |
| 1990/0227 | /* * package */ bp = allocb(n+1); | |
| 1991/0318 | memmove(bp->wptr, a, n); | |
| 1990/0227 | bp->wptr[n] = 0; bp->wptr += n + 1; /* * check for standard requests */ if(streamparse("hangup", bp)){ hangup(s); freeb(bp); } else if(streamparse("push", bp)){ qi = qinfofind((char *)bp->rptr); | |
| 1992/0318 | if(qi == 0) error(Ebadld); | |
| 1990/0227 | pushq(s, qi); freeb(bp); } else if(streamparse("pop", bp)){ popq(s); | |
| 1990/1202 | freeb(bp); } else if(streamparse("look", bp)){ qlook(s, (char *)bp->rptr); | |
| 1990/0227 | freeb(bp); } else { bp->type = M_CTL; bp->flags |= S_DELIM; PUTNEXT(s->procq, bp); } return n; } /* * wait till there's room in the next stream */ static int notfull(void *arg) { | |
| 1990/0406 | return !QFULL((Queue *)arg); | |
| 1990/0227 | } void | |
| 1992/0305 | flowctl(Queue *q, Block *bp) | |
| 1990/0227 | { | |
| 1992/0305 | if(bp->type != M_HANGUP){ qlock(&q->rlock); if(waserror()){ qunlock(&q->rlock); freeb(bp); nexterror(); } q->rp = &q->r; sleep(q->rp, notfull, q->next); | |
| 1990/1113 | qunlock(&q->rlock); | |
| 1992/0305 | poperror(); | |
| 1990/1113 | } | |
| 1992/0305 | PUTNEXT(q, bp); | |
| 1990/0227 | } /* * send the request as a single delimited block */ long | |
| 1990/0312 | streamwrite(Chan *c, void *a, long n, int docopy) | |
| 1990/0227 | { Stream *s; Queue *q; long rem; int i; | |
| 1992/0625 | Block *bp; char *va; | |
| 1990/0227 | ||
| 1990/0911 | s = c->stream; | |
| 1990/0227 | /* * decode the qid */ | |
| 1990/11211 | if(STREAMTYPE(c->qid.path) != Sdataqid) | |
| 1990/0930 | return streamctlwrite(c, a, n); | |
| 1990/0227 | /* * No writes allowed on hungup channels */ q = s->procq; | |
| 1991/0411 | if(q->other->flag & QHUNGUP){ if(s->err) | |
| 1992/0114 | error((char*)(s->err->rptr)); | |
| 1991/0411 | else error(Ehungup); } | |
| 1990/0227 | ||
| 1991/0502 | /* | |
| 1992/0625 | * Write the message using blocks <= Streamhi bytes longs | |
| 1991/0502 | */ | |
| 1992/0625 | va = a; rem = n; for(;;){ if(rem > Streamhi) i = Streamhi; else | |
| 1991/0926 | i = rem; | |
| 1992/0625 | bp = allocb(i); memmove(bp->wptr, va, i); | |
| 1991/0926 | bp->wptr += i; | |
| 1992/0625 | va += i; rem -= i; if(rem > 0){ FLOWCTL(q, bp); } else { bp->flags |= S_DELIM; FLOWCTL(q, bp); | |
| 1991/0926 | break; | |
| 1992/0625 | } | |
| 1991/0502 | } | |
| 1990/0227 | return n; | |
| 1990/0312 | } /* | |
| 1990/0801 | * stat a stream. the length is the number of bytes up to the * first delimiter. */ void streamstat(Chan *c, char *db, char *name) { Dir dir; Stream *s; Queue *q; Block *bp; long n; s = c->stream; if(s == 0) | |
| 1990/0914 | n = 0; else { q = RD(s->procq); lock(q); for(n=0, bp=q->first; bp; bp = bp->next){ n += BLEN(bp); if(bp->flags&S_DELIM) break; } unlock(q); | |
| 1990/0801 | } | |
| 1991/1109 | devdir(c, c->qid, name, n, eve, 0, &dir); | |
| 1990/0801 | convD2M(&dir, db); | |
| 1992/0318 | } | |
| 1992/0623 | /* * send a hangup up a stream */ static void hangup(Stream *s) | |
| 1992/0318 | { | |
| 1992/0623 | Block *bp; | |
| 1992/0318 | ||
| 1992/0623 | bp = allocb(0); bp->type = M_HANGUP; (*s->devq->put)(s->devq, bp); } | |
| 1992/0318 | ||
| 1992/0623 | /* * parse a string and return a pointer to the second element if the * first matches name. bp->rptr will be updated to point to the * second element. * * return 0 if no match. * * it is assumed that the block data is null terminated. streamwrite * guarantees this. */ int streamparse(char *name, Block *bp) { int len; len = strlen(name); if(BLEN(bp) < len) return 0; if(strncmp(name, (char *)bp->rptr, len)==0){ if(bp->rptr[len] == ' ') bp->rptr += len+1; else if(bp->rptr[len]) return 0; else bp->rptr += len; while(*bp->rptr==' ' && bp->wptr>bp->rptr) bp->rptr++; return 1; } return 0; | |
| 1990/0911 | } /* | |
| 1992/0623 | * like andrew's getmfields but no hidden state | |
| 1990/0911 | */ | |
| 1992/0623 | int getfields(char *lp, char **fields, int n, char sep) | |
| 1990/0911 | { | |
| 1992/0623 | int i; | |
| 1990/0911 | ||
| 1992/0623 | for(i=0; lp && *lp && i<n; i++){ while(*lp == sep) *lp++=0; if(*lp == 0) break; fields[i]=lp; while(*lp && *lp != sep) lp++; | |
| 1990/0911 | } | |
| 1992/0623 | return i; | |
| 1990/0227 | } | |