plan 9 kernel history: overview | file list | diff list

1992/0520/port/stream.c (diff list | history)

1992/0520/sys/src/9/port/stream.c:1,15131992/0529/sys/src/9/port/stream.c:1,1513 (short | long | prev | next)
1990/0227    
#include	"u.h" 
1992/0321    
#include	"../port/lib.h" 
1990/0227    
#include	"mem.h" 
#include	"dat.h" 
#include	"fns.h" 
#include	"io.h" 
1992/0111    
#include	"../port/error.h" 
1990/0227    
#include	"devtab.h" 
1990/0801    
#include	"fcall.h" 
1990/0227    
 
1990/0321    
enum { 
1990/0907    
	Nclass=4,	/* number of block classes */ 
1990/0321    
}; 
 
1990/0312    
/* 
 *  process end line discipline 
 */ 
1990/0227    
static void stputq(Queue*, Block*); 
1990/11151    
Qinfo procinfo = 
{ 
	stputq, 
	nullput, 
	0, 
	0, 
	"process" 
}; 
1990/0227    
 
1990/0312    
/* 
 *  line disciplines that can be pushed 
 */ 
1990/0911    
static Qinfo *lds; 
1990/0227    
 
/* 
 *  All stream structures are ialloc'd at boot time 
 */ 
Stream *slist; 
Queue *qlist; 
static Lock garbagelock; 
 
/* 
 *  The block classes.  There are Nclass block sizes, each with its own free list. 
 *  All are ialloced at qinit() time. 
 */ 
typedef struct { 
	int	size; 
1990/1127    
	int	made; 
1991/1107    
	QLock; 
1990/0312    
	Blist; 
1990/0227    
} Bclass; 
Bclass bclass[Nclass]={ 
	{ 0 }, 
1990/0911    
	{ 68 }, 
1991/0421    
	{ 268 }, 
1990/0409    
	{ 4096 }, 
1990/0227    
}; 
 
1990/0331    
/* 
1990/0227    
 *  Allocate streams, queues, and blocks.  Allocate n block classes with 
 *	1/2(m+1) to class m < n-1 
 *	1/2(n-1) to class n-1 
 */ 
void 
streaminit(void) 
{ 
1990/0907    
	int class, i, n; 
1990/0227    
	Bclass *bcp; 
 
1990/0911    
	/* 
1990/1127    
	 *  allocate queues, streams 
1990/0911    
	 */ 
1990/0227    
	slist = (Stream *)ialloc(conf.nstream * sizeof(Stream), 0); 
	qlist = (Queue *)ialloc(conf.nqueue * sizeof(Queue), 0); 
1990/1127    
 
	/* 
1991/0831    
	 *  set block classes 
1990/1127    
	 */ 
1990/0227    
	n = conf.nblock; 
	for(class = 0; class < Nclass; class++){ 
		if(class < Nclass-1) 
			n = n/2; 
		bcp = &bclass[class]; 
1990/1127    
		bcp->made = 0; 
1990/0227    
	} 
1990/0911    
 
	/* 
	 *  make stream modules available 
	 */ 
	streaminit0(); 
1990/0227    
} 
 
/* 
1990/0911    
 *  make known a stream module and call its initialization routine, if 
 *  it has one. 
 */ 
void 
newqinfo(Qinfo *qi) 
{ 
1991/1012    
	if(qi->next) 
		panic("newqinfo: already configured"); 
 
1990/0911    
	qi->next = lds; 
	lds = qi; 
	if(qi->reset) 
		(*qi->reset)(); 
} 
 
/* 
1990/1127    
 *  upgrade a block 0 block to another class (called with bcp qlocked) 
 */ 
1990/1214    
int 
1990/1127    
newblock(Bclass *bcp) 
{ 
	Page *page; 
	int n; 
	Block *bp; 
	uchar *cp; 
 
1991/0705    
	page = newpage(1, 0, 0); 
1992/0101    
	page->va = VA(kmapperm(page)); 
1990/1127    
	if(bcp == bclass){ 
		/* 
		 *  create some level zero blocks and return 
		 */ 
		n = BY2PG/sizeof(Block); 
		bp = (Block *)(page->va); 
		while(n-- > 0){ 
			bp->flags = 0; 
			bp->base = bp->lim = bp->rptr = bp->wptr = 0; 
			if(bcp->first) 
				bcp->last->next = bp; 
			else 
				bcp->first = bp; 
			bcp->last = bp; 
			bcp->made++; 
			bp++; 
		} 
	} else { 
		/* 
		 *  create a page worth of new blocks 
		 */ 
		n = BY2PG/bcp->size; 
		cp = (uchar *)(page->va); 
		 
		while(n-- > 0){ 
			/* 
			 *  upgrade a level 0 block 
			 */ 
			bp = allocb(0); 
1991/1107    
			qlock(bclass); 
1990/1127    
			bclass->made--; 
			bcp->made++; 
			bp->flags = bcp - bclass; 
1991/1107    
			qunlock(bclass); 
1990/1127    
 
			/* 
			 *  tack on the data area 
			 */ 
			bp->base = bp->rptr = bp->wptr = cp; 
			cp += bcp->size; 
			bp->lim = cp; 
			if(bcp->first) 
				bcp->last->next = bp; 
			else 
				bcp->first = bp; 
			bcp->last = bp; 
		} 
	} 
1990/1214    
	return 0; 
1990/1127    
} 
 
/* 
1990/0227    
 *  allocate a block 
 */ 
static int 
isblock(void *arg) 
{ 
	Bclass *bcp; 
 
	bcp = (Bclass *)arg; 
	return bcp->first!=0; 
} 
Block * 
allocb(ulong size) 
{ 
	Block *bp; 
	Bclass *bcp; 
 
	/* 
	 *  map size to class 
	 */ 
	for(bcp=bclass; bcp->size<size && bcp<&bclass[Nclass-1]; bcp++) 
		; 
 
	/* 
1990/0930    
	 *  look for a free block 
1990/0227    
	 */ 
1991/1107    
	qlock(bcp); 
1990/0227    
	while(bcp->first == 0){ 
1990/1113    
		if(waserror()){ 
1991/1107    
			qunlock(bcp); 
1990/1113    
			nexterror(); 
		} 
1991/0831    
		newblock(bcp); 
1990/1113    
		poperror(); 
1990/0227    
	} 
	bp = bcp->first; 
	bcp->first = bp->next; 
	if(bcp->first == 0) 
		bcp->last = 0; 
1991/1107    
	qunlock(bcp); 
1990/0227    
 
	/* 
	 *  return an empty block 
	 */ 
1991/0419    
	bp->flags = bcp - bclass; 
1990/0227    
	bp->rptr = bp->wptr = bp->base; 
	bp->next = 0; 
1991/1122    
	bp->list = 0; 
1990/0227    
	bp->type = M_DATA; 
	bp->flags &= S_CLASS; 
1991/0328    
	if(bp->lim-bp->rptr<size && size<4096) 
		panic("allocb %lux %lux %d %ux %d", bp->lim, bp->rptr, 
			size, bp->flags, bcp-bclass); 
1990/0227    
	return bp; 
} 
 
/* 
1990/0312    
 *  Free a block (or list of blocks).  Poison its pointers so that 
 *  someone trying to access it after freeing will cause a dump. 
1990/0227    
 */ 
void 
freeb(Block *bp) 
{ 
1991/0420    
	ulong mark[1]; 
1991/0328    
	Block *nbp; 
1990/0227    
	Bclass *bcp; 
1990/1229    
	int x; 
1992/0222    
	ulong pc; 
1990/0227    
 
1992/0222    
	pc = getcallerpc(((uchar*)&bp) - sizeof(bp)); 
	if((bp->flags&S_CLASS) >= Nclass)		/* Check for double free */ 
		panic("freeb class last(%lux) this(%lux)", bp->pc, pc); 
	bp->pc = pc; 
1991/0420    
 
1991/0328    
	for(; bp; bp = nbp){ 
		bcp = &bclass[bp->flags & S_CLASS]; 
1991/0516    
		bp->flags = bp->flags|S_CLASS;		/* Check for double free */ 
1991/1107    
		qlock(bcp); 
1991/0328    
		bp->rptr = bp->wptr = 0; 
		if(bcp->first) 
			bcp->last->next = bp; 
		else 
			bcp->first = bp; 
		bcp->last = bp; 
		nbp = bp->next; 
		bp->next = 0; 
1991/1107    
		qunlock(bcp); 
1990/0312    
	} 
1990/0227    
} 
 
/* 
1990/0911    
 *  pad a block to the front with n bytes 
 */ 
Block * 
padb(Block *bp, int n) 
{ 
	Block *nbp; 
 
	if(bp->base && bp->rptr-bp->base>=n){ 
		bp->rptr -= n; 
		return bp; 
	} else { 
		nbp = allocb(n); 
		nbp->wptr = nbp->lim; 
		nbp->rptr = nbp->wptr - n; 
		nbp->next = bp; 
		return nbp; 
	} 
}  
 
/* 
1990/0227    
 *  allocate a pair of queues.  flavor them with the requested put routines. 
 *  the `QINUSE' flag on the read side is the only one used. 
 */ 
static Queue * 
allocq(Qinfo *qi) 
{ 
	Queue *q, *wq; 
 
	for(q=qlist; q<&qlist[conf.nqueue]; q++, q++) { 
		if(q->flag == 0){ 
			if(canlock(q)){ 
				if(q->flag == 0) 
					break; 
				unlock(q); 
			} 
		} 
	} 
 
	if(q == &qlist[conf.nqueue]){ 
		print("no more queues\n"); 
1992/0114    
		exhausted("queues"); 
1990/0227    
	} 
 
	q->flag = QINUSE; 
	q->r.p = 0; 
	q->info = qi; 
	q->put = qi->iput; 
1990/0403    
	q->len = q->nb = 0; 
1990/1009    
	q->ptr = 0; 
1990/1212    
	q->rp = &q->r; 
1990/0227    
	wq = q->other = q + 1; 
 
1990/0702    
	wq->flag = QINUSE; 
1990/0227    
	wq->r.p = 0; 
	wq->info = qi; 
	wq->put = qi->oput; 
	wq->other = q; 
1990/1009    
	wq->ptr = 0; 
1990/0403    
	wq->len = wq->nb = 0; 
1990/1212    
	wq->rp = &wq->r; 
1990/0227    
 
	unlock(q); 
 
	return q; 
} 
 
/* 
1990/0629    
 *  flush a queue 
 */ 
static void 
flushq(Queue *q) 
{ 
	Block *bp; 
 
	q = RD(q); 
	while(bp = getq(q)) 
		freeb(bp); 
	q = WR(q); 
	while(bp = getq(q)) 
		freeb(bp); 
} 
 
/* 
1990/0227    
 *  free a queue 
 */ 
static void 
freeq(Queue *q) 
{ 
	Block *bp; 
 
	q = RD(q); 
	while(bp = getq(q)) 
		freeb(bp); 
	q = WR(q); 
	while(bp = getq(q)) 
		freeb(bp); 
	RD(q)->flag = 0; 
} 
 
/* 
 *  push a queue onto a stream referenced by the proc side write q 
 */ 
Queue * 
pushq(Stream* s, Qinfo *qi) 
{ 
	Queue *q; 
	Queue *nq; 
 
	q = RD(s->procq); 
 
	/* 
	 *  make the new queue 
	 */ 
	nq = allocq(qi); 
 
	/* 
	 *  push 
	 */ 
1991/0413    
	qlock(s); 
1990/0227    
	RD(nq)->next = q; 
	RD(WR(q)->next)->next = RD(nq); 
	WR(nq)->next = WR(q)->next; 
	WR(q)->next = WR(nq); 
1991/0413    
	qunlock(s); 
1990/0227    
 
	if(qi->open) 
		(*qi->open)(RD(nq), s); 
 
	return WR(nq)->next; 
} 
 
/* 
 *  pop off the top line discipline 
 */ 
static void 
popq(Stream *s) 
{ 
	Queue *q; 
 
1991/0413    
	if(waserror()){ 
		qunlock(s); 
		nexterror(); 
	} 
	qlock(s); 
1990/0227    
	if(s->procq->next == WR(s->devq)) 
1990/11211    
		error(Ebadld); 
1990/0227    
	q = s->procq->next; 
	if(q->info->close) 
		(*q->info->close)(RD(q)); 
	s->procq->next = q->next; 
	RD(q->next)->next = RD(s->procq); 
1991/0413    
	qunlock(s); 
1990/0227    
	freeq(q); 
} 
 
/* 
 *  add a block (or list of blocks) to the end of a queue.  return true 
 *  if one of the blocks contained a delimiter.  
 */ 
int 
putq(Queue *q, Block *bp) 
{ 
	int delim; 
 
	lock(q); 
	if(q->first) 
		q->last->next = bp; 
	else 
		q->first = bp; 
1990/0312    
	q->len += BLEN(bp); 
1990/0403    
	q->nb++; 
1990/0227    
	delim = bp->flags & S_DELIM; 
	while(bp->next) { 
		bp = bp->next; 
1990/0312    
		q->len += BLEN(bp); 
1990/0403    
		q->nb++; 
1990/0227    
		delim |= bp->flags & S_DELIM; 
	} 
	q->last = bp; 
1990/0403    
	if(q->len >= Streamhi || q->nb >= Streambhi) 
1990/0227    
		q->flag |= QHIWAT; 
	unlock(q); 
	return delim; 
} 
1991/0316    
 
1990/0227    
int 
1991/0316    
blen(Block *bp) 
{ 
	int len; 
 
	len = 0; 
	while(bp) { 
		len += BLEN(bp); 
		bp = bp->next; 
	} 
 
	return len; 
} 
 
/* 
1991/0317    
 * bround - round a block chain to some 2^n number of bytes 
1991/0316    
 */ 
int 
bround(Block *bp, int amount) 
{ 
	Block *last; 
	int len, pad; 
 
	len = 0; 
1991/0317    
	SET(last);	/* Ken's magic */ 
 
1991/0316    
	while(bp) { 
		len += BLEN(bp); 
		last = bp; 
		bp = bp->next; 
	} 
 
	pad = ((len + amount) & ~amount) - len; 
	if(pad) { 
		last->next = allocb(pad); 
		last->flags &= ~S_DELIM; 
1991/0317    
		last = last->next; 
		memset(last->wptr, 0, pad); 
		last->wptr += pad; 
		last->flags |= S_DELIM; 
		 
1991/0316    
	} 
 
	return len + pad; 
} 
 
int 
1990/0227    
putb(Blist *q, Block *bp) 
{ 
	int delim; 
 
	if(q->first) 
		q->last->next = bp; 
	else 
		q->first = bp; 
1990/0312    
	q->len += BLEN(bp); 
1990/0227    
	delim = bp->flags & S_DELIM; 
	while(bp->next) { 
		bp = bp->next; 
1990/0312    
		q->len += BLEN(bp); 
1990/0227    
		delim |= bp->flags & S_DELIM; 
	} 
	q->last = bp; 
	return delim; 
} 
 
/* 
 *  add a block to the start of a queue  
 */ 
1990/0312    
void 
1990/0227    
putbq(Blist *q, Block *bp) 
{ 
	lock(q); 
	if(q->first) 
		bp->next = q->first; 
	else 
		q->last = bp; 
	q->first = bp; 
1990/0312    
	q->len += BLEN(bp); 
1990/0403    
	q->nb++; 
1990/0227    
	unlock(q); 
} 
 
/* 
1990/0312    
 *  remove the first block from a queue 
1990/0227    
 */ 
Block * 
getq(Queue *q) 
{ 
	Block *bp; 
 
	lock(q); 
	bp = q->first; 
	if(bp) { 
		q->first = bp->next; 
		if(q->first == 0) 
			q->last = 0; 
1990/0312    
		q->len -= BLEN(bp); 
1990/0403    
		q->nb--; 
1991/1126    
		if((q->flag&QHIWAT) && q->len<Streamhi/2 && q->nb<Streambhi/2 &&q->other){ 
1990/1212    
			wakeup(q->other->next->other->rp); 
1990/0227    
			q->flag &= ~QHIWAT; 
		} 
		bp->next = 0; 
	} 
	unlock(q); 
	return bp; 
} 
1990/0312    
 
/* 
 *  remove the first block from a list of blocks 
 */ 
1990/0227    
Block * 
getb(Blist *q) 
{ 
	Block *bp; 
 
	bp = q->first; 
	if(bp) { 
		q->first = bp->next; 
		if(q->first == 0) 
			q->last = 0; 
1990/0312    
		q->len -= BLEN(bp); 
1990/0227    
		bp->next = 0; 
	} 
	return bp; 
} 
 
/* 
1990/0907    
 *  make sure the first block has n bytes 
 */ 
Block * 
pullup(Block *bp, int n) 
{ 
	Block *nbp; 
	int i; 
 
	/* 
	 *  this should almost always be true, the rest it 
	 *  just for to avoid every caller checking. 
	 */ 
	if(BLEN(bp) >= n) 
		return bp; 
 
	/* 
	 *  if not enough room in the first block, 
	 *  add another to the front of the list. 
1991/0419    
	 */ 
1990/0907    
	if(bp->lim - bp->rptr < n){ 
		nbp = allocb(n); 
		nbp->next = bp; 
		bp = nbp; 
	} 
 
	/* 
	 *  copy bytes from the trailing blocks into the first 
	 */ 
	n -= BLEN(bp); 
	while(nbp = bp->next){ 
		i = BLEN(nbp); 
		if(i > n) { 
1992/0529    
		if(i >= n) { 
1991/0318    
			memmove(bp->wptr, nbp->rptr, n); 
1990/0907    
			bp->wptr += n; 
			nbp->rptr += n; 
			return bp; 
		} else { 
1991/0318    
			memmove(bp->wptr, nbp->rptr, i); 
1990/0907    
			bp->wptr += i; 
			bp->next = nbp->next; 
			nbp->next = 0; 
			freeb(nbp); 
		} 
	} 
	freeb(bp); 
	return 0; 
} 
 
/* 
1991/1107    
 *  expand a block list to be one byte, len bytes long 
1992/0529    
 *  expand a block list to be one block, len bytes long 
1991/1107    
 */ 
Block* 
expandb(Block *bp, int len) 
{ 
	Block *nbp, *new; 
	int i; 
1991/1121    
	ulong delim = 0; 
1991/1107    
 
	new = allocb(len); 
	if(new == 0){ 
		freeb(bp); 
		return 0; 
	} 
 
	/* 
	 *  copy bytes into new block 
	 */ 
	for(nbp = bp; len>0 && nbp; nbp = nbp->next){ 
1991/1121    
		delim = nbp->flags & S_DELIM; 
		i = BLEN(nbp); 
1991/1107    
		if(i > len) { 
			memmove(new->wptr, nbp->rptr, len); 
			new->wptr += len; 
			break; 
		} else { 
			memmove(new->wptr, nbp->rptr, i); 
			new->wptr += i; 
			len -= i; 
		} 
	} 
	if(len){ 
		memset(new->wptr, 0, len); 
		new->wptr += len; 
	} 
1991/1121    
	new->flags |= delim; 
1991/1107    
	freeb(bp); 
	return new; 
 
} 
 
/* 
1990/0312    
 *  grow the front of a list of blocks by n bytes 
 */ 
Block * 
prepend(Block *bp, int n) 
{ 
	Block *nbp; 
 
	if(bp->base && (bp->rptr - bp->base)>=n){ 
		/* 
		 *  room for channel number in first block of message 
		 */ 
		bp->rptr -= n; 
		return bp; 
	} else { 
		/* 
		 *  make new block, put message number at end 
		 */ 
		nbp = allocb(2); 
		nbp->next = bp; 
		nbp->wptr = nbp->lim; 
		nbp->rptr = nbp->wptr - n; 
		return nbp; 
	} 
} 
1991/0316    
 
1990/0312    
 
/* 
1990/0227    
 *  put a block into the bit bucket 
 */ 
void 
nullput(Queue *q, Block *bp) 
{ 
1991/1115    
	USED(q); 
1990/0629    
	if(bp->type == M_HANGUP) 
		freeb(bp); 
	else { 
		freeb(bp); 
1990/11211    
		error(Ehungup); 
1990/0629    
	} 
1990/0227    
} 
 
/* 
 *  find the info structure for line discipline 'name' 
 */ 
1992/0318    
Qinfo * 
1990/0227    
qinfofind(char *name) 
{ 
1990/0911    
	Qinfo *qi; 
1990/0227    
 
	if(name == 0) 
1992/0318    
		return 0; 
1990/0911    
	for(qi = lds; qi; qi = qi->next) 
		if(strcmp(qi->name, name)==0) 
			return qi; 
1992/0318    
	return 0; 
1990/0227    
} 
 
/* 
 *  send a hangup up a stream 
 */ 
static void 
hangup(Stream *s) 
{ 
	Block *bp; 
 
	bp = allocb(0); 
	bp->type = M_HANGUP; 
	(*s->devq->put)(s->devq, bp); 
} 
 
/* 
 *  parse a string and return a pointer to the second element if the  
 *  first matches name.  bp->rptr will be updated to point to the 
 *  second element. 
 * 
 *  return 0 if no match. 
 * 
 *  it is assumed that the block data is null terminated.  streamwrite 
 *  guarantees this. 
 */ 
int 
streamparse(char *name, Block *bp) 
{ 
	int len; 
 
	len = strlen(name); 
1990/0312    
	if(BLEN(bp) < len) 
1990/0227    
		return 0; 
	if(strncmp(name, (char *)bp->rptr, len)==0){ 
		if(bp->rptr[len] == ' ') 
			bp->rptr += len+1; 
		else if(bp->rptr[len]) 
			return 0; 
		else 
			bp->rptr += len; 
1991/0401    
		while(*bp->rptr==' ' && bp->wptr>bp->rptr) 
			bp->rptr++; 
1990/0227    
		return 1; 
	} 
	return 0; 
} 
 
/* 
1990/0907    
 *  the per stream directory structure 
 */ 
Dirtab streamdir[]={ 
1990/11211    
	"data",		{Sdataqid},	0,			0600, 
	"ctl",		{Sctlqid},	0,			0600, 
1990/0907    
}; 
 
/* 
1990/0227    
 *  A stream device consists of the contents of streamdir plus 
 *  any directory supplied by the actual device. 
 * 
 *  values of s: 
 * 	0 to ntab-1 apply to the auxiliary directory. 
 *	ntab to ntab+Shighqid-Slowqid+1 apply to streamdir. 
 */ 
int 
streamgen(Chan *c, Dirtab *tab, int ntab, int s, Dir *dp) 
{ 
	Proc *p; 
	char buf[NAMELEN]; 
 
	if(s < ntab) 
		tab = &tab[s]; 
	else if(s < ntab + Shighqid - Slowqid + 1) 
		tab = &streamdir[s - ntab]; 
	else 
		return -1; 
 
1991/1109    
	devdir(c, (Qid){STREAMQID(STREAMID(c->qid.path),tab->qid.path), 0},  
		tab->name, tab->length, eve, tab->perm, dp); 
1990/0227    
	return 1; 
} 
 
/* 
1990/1009    
 *  create a new stream, if noopen is non-zero, don't increment the open count 
1990/0227    
 */ 
Stream * 
1990/1009    
streamnew(ushort type, ushort dev, ushort id, Qinfo *qi, int noopen) 
1990/0227    
{ 
	Stream *s; 
	Queue *q; 
 
	/* 
	 *  find a free stream struct 
	 */ 
	for(s = slist; s < &slist[conf.nstream]; s++) { 
		if(s->inuse == 0){ 
1990/11161    
			if(canqlock(s)){ 
1990/0227    
				if(s->inuse == 0) 
					break; 
1990/11161    
				qunlock(s); 
1990/0227    
			} 
		} 
	} 
	if(s == &slist[conf.nstream]){ 
		print("no more streams\n"); 
1992/0114    
		exhausted("streams"); 
1990/0227    
	} 
	if(waserror()){ 
1990/11161    
		qunlock(s); 
1990/1009    
		streamclose1(s); 
1990/0227    
		nexterror(); 
	} 
 
	/* 
1990/1009    
	 *  identify the stream 
1990/0227    
	 */ 
1990/1009    
	s->type = type; 
	s->dev = dev; 
	s->id = id; 
1991/0411    
	s->err = 0; 
1990/0227    
 
	/* 
 	 *  hang a device and process q off the stream 
	 */ 
	s->inuse = 1; 
1990/1009    
	if(noopen) 
		s->opens = 0; 
	else 
		s->opens = 1; 
1990/0331    
	s->hread = 0; 
1990/0227    
	q = allocq(&procinfo); 
1991/0411    
	WR(q)->ptr = s; 
	RD(q)->ptr = s; 
1990/0227    
	s->procq = WR(q); 
	q = allocq(qi); 
	s->devq = RD(q); 
	WR(s->procq)->next = WR(s->devq); 
	RD(s->procq)->next = 0; 
	RD(s->devq)->next = RD(s->procq); 
	WR(s->devq)->next = 0; 
 
	if(qi->open) 
		(*qi->open)(RD(s->devq), s); 
 
1990/11161    
	qunlock(s); 
1990/0227    
	poperror(); 
	return s; 
} 
 
/* 
 *  (Re)open a stream.  If this is the first open, create a stream. 
 */ 
void 
streamopen(Chan *c, Qinfo *qi) 
{ 
	Stream *s; 
	Queue *q; 
 
	/* 
1990/1009    
	 *  if the stream already exists, just increment the reference counts. 
1990/0227    
	 */ 
	for(s = slist; s < &slist[conf.nstream]; s++) { 
		if(s->inuse && s->type == c->type && s->dev == c->dev 
1990/11211    
		   && s->id == STREAMID(c->qid.path)){ 
1990/11161    
			qlock(s); 
1990/0227    
			if(s->inuse && s->type == c->type 
			&& s->dev == c->dev 
1990/11211    
		 	&& s->id == STREAMID(c->qid.path)){ 
1990/0227    
				s->inuse++; 
1990/0629    
				s->opens++; 
1990/0227    
				c->stream = s; 
1990/11161    
				qunlock(s); 
1990/0227    
				return; 
			} 
1990/11161    
			qunlock(s); 
1990/0227    
		} 
	} 
 
	/* 
	 *  create a new stream 
	 */ 
1990/11211    
	c->stream = streamnew(c->type, c->dev, STREAMID(c->qid.path), qi, 0); 
1990/0227    
} 
 
/* 
1990/0629    
 *  Enter a stream.  Increment the reference count so it can't disappear 
 *  under foot. 
 */ 
int 
streamenter(Stream *s) 
{ 
1990/11161    
	qlock(s); 
1990/0629    
	if(s->opens == 0){ 
1990/11161    
		qunlock(s); 
1990/0629    
		return -1; 
	} 
	s->inuse++; 
1990/11161    
	qunlock(s); 
1990/0629    
	return 0; 
} 
 
/* 
 *  Decrement the reference count on a stream.  If the count is 
 *  zero, free the stream. 
 */ 
1990/1101    
int 
1990/0629    
streamexit(Stream *s, int locked) 
{ 
	Queue *q; 
	Queue *nq; 
1990/1101    
	int rv; 
1990/1104    
	char *name; 
1990/0629    
 
	if(!locked) 
1990/11161    
		qlock(s); 
1990/0629    
	if(s->inuse == 1){ 
1990/1104    
		if(s->opens != 0) 
1990/11151    
			panic("streamexit %d %s\n", s->opens, s->devq->info->name); 
1990/1104    
 
1990/0629    
		/* 
		 *  ascend the stream freeing the queues 
		 */ 
		for(q = s->devq; q; q = nq){ 
			nq = q->next; 
			freeq(q); 
		} 
		s->id = s->dev = s->type = 0; 
1991/0411    
		if(s->err) 
			freeb(s->err); 
1990/0629    
	} 
	s->inuse--; 
1990/1101    
	rv = s->inuse; 
1990/0629    
	if(!locked) 
1990/11161    
		qunlock(s); 
1990/1101    
	return rv; 
1990/0629    
} 
 
/* 
1990/0227    
 *  On the last close of a stream, for each queue on the 
 *  stream release its blocks and call its close routine. 
 */ 
1990/11211    
int 
1990/1009    
streamclose1(Stream *s) 
1990/0227    
{ 
	Queue *q, *nq; 
	Block *bp; 
1990/11211    
	int rv; 
1990/0227    
 
	/* 
1990/0629    
	 *  decrement the reference count 
1990/0227    
	 */ 
1990/11161    
	qlock(s); 
1990/0629    
	if(s->opens == 1){ 
1990/11161    
		/* 
		 *  descend the stream closing the queues 
		 */ 
		for(q = s->procq; q; q = q->next){ 
1991/1227    
			if(!waserror()){ 
1990/1011    
				if(q->info->close) 
					(*q->info->close)(q->other); 
1990/11161    
				poperror(); 
1990/1011    
			} 
1990/11161    
			WR(q)->put = nullput; 
 
			/* 
			 *  this may be 2 streams joined device end to device end 
			 */ 
			if(q == s->devq->other) 
				break; 
1990/0629    
		} 
	 
		/* 
		 *  ascend the stream flushing the queues 
		 */ 
		for(q = s->devq; q; q = nq){ 
			nq = q->next; 
			flushq(q); 
		} 
1990/0227    
	} 
1990/11211    
	rv = --(s->opens); 
1990/0227    
 
	/* 
1990/0629    
	 *  leave it and free it 
1990/0227    
	 */ 
1990/0629    
	streamexit(s, 1); 
1990/11161    
	qunlock(s); 
1990/11211    
	return rv; 
1990/0227    
} 
1990/11211    
int 
1990/1009    
streamclose(Chan *c) 
{ 
	/* 
	 *  if no stream, ignore it 
	 */ 
	if(!c->stream) 
1990/1214    
		return 0; 
1990/11211    
	return streamclose1(c->stream); 
1990/1009    
} 
1990/0227    
 
/* 
 *  put a block to be read into the queue.  wakeup any waiting reader 
 */ 
void 
stputq(Queue *q, Block *bp) 
{ 
1990/0321    
	int delim; 
1991/0411    
	Stream *s; 
1990/0227    
 
	if(bp->type == M_HANGUP){ 
1991/0411    
		s = q->ptr; 
		if(bp->rptr<bp->wptr && s->err==0) 
			s->err = bp; 
		else 
			freeb(bp); 
1990/0227    
		q->flag |= QHUNGUP; 
		q->other->flag |= QHUNGUP; 
1990/1212    
		wakeup(q->other->rp); 
1990/0321    
		delim = 1; 
1990/0227    
	} else { 
		lock(q); 
		if(q->first) 
			q->last->next = bp; 
		else 
			q->first = bp; 
1990/0312    
		q->len += BLEN(bp); 
1990/0403    
		q->nb++; 
1990/0321    
		delim = bp->flags & S_DELIM; 
1990/0312    
		while(bp->next) { 
			bp = bp->next; 
			q->len += BLEN(bp); 
1990/0403    
			q->nb++; 
1990/0321    
			delim |= bp->flags & S_DELIM; 
1990/0312    
		} 
1990/0227    
		q->last = bp; 
1990/0403    
		if(q->len >= Streamhi || q->nb >= Streambhi){ 
1990/0227    
			q->flag |= QHIWAT; 
1990/0321    
			delim = 1; 
		} 
1990/0227    
		unlock(q); 
	} 
1990/0321    
	if(delim) 
1990/1212    
		wakeup(q->rp); 
1990/0227    
} 
 
/* 
 *  read a string.  update the offset accordingly. 
 */ 
long 
1991/1115    
stringread(uchar *buf, long n, char *str, ulong offset) 
1990/0227    
{ 
	long i; 
 
	i = strlen(str); 
1991/0411    
	i -= offset; 
1990/0227    
	if(i<n) 
		n = i; 
	if(n<0) 
		return 0; 
1991/0411    
	memmove(buf, str + offset, n); 
1990/0227    
	return n; 
} 
 
/* 
1990/0930    
 *  return the stream id 
 */ 
long 
streamctlread(Chan *c, void *vbuf, long n) 
{ 
	uchar *buf = vbuf; 
	char num[32]; 
	Stream *s; 
 
	s = c->stream; 
1990/11211    
	if(STREAMTYPE(c->qid.path) == Sctlqid){ 
1990/0930    
		sprint(num, "%d", s->id); 
1991/1115    
		return stringread(buf, n, num, c->offset); 
1990/0930    
	} else { 
1990/11211    
		if(CHDIR & c->qid.path) 
1990/0930    
			return devdirread(c, vbuf, n, 0, 0, streamgen); 
		else 
			panic("streamctlread"); 
	} 
1992/0520    
	return 0;	/* not reached */ 
1990/0930    
} 
 
/* 
1990/0227    
 *  return true if there is an output buffer available 
 */ 
static int 
isinput(void *x) 
{ 
1990/0930    
	Queue *q; 
 
	q = (Queue *)x; 
	return (q->flag&QHUNGUP) || q->first!=0; 
1990/0227    
} 
 
/* 
 *  read until we fill the buffer or until a DELIM is encountered 
 */ 
long 
streamread(Chan *c, void *vbuf, long n) 
{ 
	Block *bp; 
1991/0811    
	Block *tofree; 
1990/0227    
	Stream *s; 
	Queue *q; 
1990/0930    
	int left, i; 
1990/0227    
	uchar *buf = vbuf; 
 
1990/11211    
	if(STREAMTYPE(c->qid.path) != Sdataqid) 
1990/0930    
		return streamctlread(c, vbuf, n); 
1990/0227    
 
	/* 
	 *  one reader at a time 
	 */ 
1990/0930    
	s = c->stream; 
1991/0809    
	left = n; 
1990/0227    
	qlock(&s->rdlock); 
1991/0811    
	tofree = 0; 
1991/0904    
	q = 0; 
1990/0227    
	if(waserror()){ 
1991/0809    
		/* 
1991/0811    
		 *  put any partially read message back into the 
		 *  queue 
1991/0809    
		 */ 
1991/0811    
		while(tofree){ 
			bp = tofree; 
			tofree = bp->next; 
			bp->next = 0; 
			putbq(q, bp); 
		} 
1990/0227    
		qunlock(&s->rdlock); 
		nexterror(); 
	} 
 
	/* 
	 *  sleep till data is available 
	 */ 
	q = RD(s->procq); 
	while(left){ 
		bp = getq(q); 
		if(bp == 0){ 
1990/0331    
			if(q->flag & QHUNGUP){ 
1991/0411    
				if(s->err) 
1992/0114    
					error((char*)s->err->rptr); 
1991/0411    
				else if(s->hread++<3) 
1990/0331    
					break; 
				else 
1990/11211    
					error(Ehungup); 
1990/0331    
			} 
1990/1212    
			q->rp = &q->r; 
1992/0207    
			sleep(q->rp, isinput, (void *)q); 
1991/0809    
			continue; 
		} 
 
1990/0312    
		i = BLEN(bp); 
1990/0227    
		if(i <= left){ 
1991/0318    
			memmove(buf, bp->rptr, i); 
1990/0227    
			left -= i; 
			buf += i; 
1991/0811    
			bp->next = tofree; 
			tofree = bp; 
			if(bp->flags & S_DELIM) 
1990/0227    
				break; 
		} else { 
1991/0318    
			memmove(buf, bp->rptr, left); 
1990/0227    
			bp->rptr += left; 
			putbq(q, bp); 
			left = 0; 
		} 
1991/0501    
	} 
1991/0811    
 
	/* 
	 *  free completely read blocks 
	 */ 
	if(tofree) 
		freeb(tofree); 
1990/0227    
 
	qunlock(&s->rdlock); 
	poperror(); 
	return n - left;	 
} 
 
/* 
1990/1202    
 *  look for an instance of the line discipline `name' on 
 *  the stream `s' 
 */ 
void 
qlook(Stream *s, char *name) 
{ 
	Queue *q; 
 
	for(q = s->procq; q; q = q->next){ 
		if(strcmp(q->info->name, name) == 0) 
			return; 
 
		/* 
		 *  this may be 2 streams joined device end to device end 
		 */ 
		if(q == s->devq->other) 
			break; 
	} 
1992/0114    
	error(Ebadarg); 
1990/1202    
} 
 
/* 
1990/0227    
 *  Handle a ctl request.  Streamwide requests are: 
 * 
 *	hangup			-- send an M_HANGUP up the stream 
 *	push ldname		-- push the line discipline named ldname 
 *	pop			-- pop a line discipline 
1990/1202    
 *	look ldname		-- look for a line discipline 
1990/0227    
 * 
1991/1105    
 *  This routing is entered with s->wrlock'ed and must unlock. 
1990/0227    
 */ 
static long 
1990/0930    
streamctlwrite(Chan *c, void *a, long n) 
1990/0227    
{ 
	Qinfo *qi; 
	Block *bp; 
1990/0930    
	Stream *s; 
1990/0227    
 
1990/11211    
	if(STREAMTYPE(c->qid.path) != Sctlqid) 
1990/0930    
		panic("streamctlwrite %lux", c->qid); 
	s = c->stream; 
 
1990/0227    
	/* 
	 *  package 
	 */ 
	bp = allocb(n+1); 
1991/0318    
	memmove(bp->wptr, a, n); 
1990/0227    
	bp->wptr[n] = 0; 
	bp->wptr += n + 1; 
 
	/* 
	 *  check for standard requests 
	 */ 
	if(streamparse("hangup", bp)){ 
		hangup(s); 
		freeb(bp); 
	} else if(streamparse("push", bp)){ 
		qi = qinfofind((char *)bp->rptr); 
1992/0318    
		if(qi == 0) 
			error(Ebadld); 
1990/0227    
		pushq(s, qi); 
		freeb(bp); 
	} else if(streamparse("pop", bp)){ 
		popq(s); 
1990/1202    
		freeb(bp); 
	} else if(streamparse("look", bp)){ 
		qlook(s, (char *)bp->rptr); 
1990/0227    
		freeb(bp); 
	} else { 
		bp->type = M_CTL; 
		bp->flags |= S_DELIM; 
		PUTNEXT(s->procq, bp); 
	} 
 
	return n; 
} 
 
/* 
 *  wait till there's room in the next stream 
 */ 
static int 
notfull(void *arg) 
{ 
1990/0406    
	return !QFULL((Queue *)arg); 
1990/0227    
} 
void 
1992/0305    
flowctl(Queue *q, Block *bp) 
1990/0227    
{ 
1992/0305    
	if(bp->type != M_HANGUP){ 
		qlock(&q->rlock); 
		if(waserror()){ 
			qunlock(&q->rlock); 
			freeb(bp); 
			nexterror(); 
		} 
		q->rp = &q->r; 
		sleep(q->rp, notfull, q->next); 
1990/1113    
		qunlock(&q->rlock); 
1992/0305    
		poperror(); 
1990/1113    
	} 
1992/0305    
	PUTNEXT(q, bp); 
1990/0227    
} 
 
/* 
 *  send the request as a single delimited block 
 */ 
long 
1990/0312    
streamwrite(Chan *c, void *a, long n, int docopy) 
1990/0227    
{ 
	Stream *s; 
	Queue *q; 
	long rem; 
	int i; 
1991/0926    
	Block *bp, *first, *last; 
1990/0227    
 
1990/0911    
	s = c->stream; 
 
1990/0227    
	/* 
	 *  decode the qid 
	 */ 
1990/11211    
	if(STREAMTYPE(c->qid.path) != Sdataqid) 
1990/0930    
		return streamctlwrite(c, a, n); 
1990/0227    
 
	/* 
	 *  No writes allowed on hungup channels 
	 */ 
	q = s->procq; 
1991/0411    
	if(q->other->flag & QHUNGUP){ 
		if(s->err) 
1992/0114    
			error((char*)(s->err->rptr)); 
1991/0411    
		else 
			error(Ehungup); 
	} 
1990/0227    
 
1991/0502    
	/* 
1991/0926    
	 *  copy the whole write into kernel space 
1991/0502    
	 */ 
1991/0926    
	first = last = 0; 
	for(rem = n; ; rem -= i) { 
		bp = allocb(rem); 
		i = bp->lim - bp->wptr; 
		if(i >= rem) 
			i = rem; 
		memmove(bp->wptr, a, i); 
		bp->wptr += i; 
1991/0502    
		bp->type = M_DATA; 
1991/0926    
		a = ((char*)a) + i; 
		if(first == 0) 
			first = bp; 
		else 
			last->next = bp; 
		last = bp; 
		if(i == rem) 
			break; 
1991/0502    
	} 
 
1991/0926    
	/* 
	 *  send it down stream 
	 */ 
	last->flags |= S_DELIM; 
1992/0305    
	FLOWCTL(q, first); 
1990/0227    
	return n; 
1990/0312    
} 
 
/* 
 *  like andrew's getmfields but no hidden state 
 */ 
int 
1991/1027    
getfields(char *lp, char **fields, int n, char sep) 
1990/0312    
{ 
	int i; 
 
	for(i=0; lp && *lp && i<n; i++){ 
		while(*lp == sep) 
			*lp++=0; 
		if(*lp == 0) 
			break; 
		fields[i]=lp; 
		while(*lp && *lp != sep) 
			lp++; 
	} 
	return i; 
1990/0801    
} 
 
/* 
 *  stat a stream.  the length is the number of bytes up to the 
 *  first delimiter. 
 */ 
void 
streamstat(Chan *c, char *db, char *name) 
{ 
	Dir dir; 
	Stream *s; 
	Queue *q; 
	Block *bp; 
	long n; 
 
	s = c->stream; 
	if(s == 0) 
1990/0914    
		n = 0; 
	else { 
		q = RD(s->procq); 
		lock(q); 
		for(n=0, bp=q->first; bp; bp = bp->next){ 
			n += BLEN(bp); 
			if(bp->flags&S_DELIM) 
				break; 
		} 
		unlock(q); 
1990/0801    
	} 
 
1991/1109    
	devdir(c, c->qid, name, n, eve, 0, &dir); 
1990/0801    
	convD2M(&dir, db); 
1992/0318    
} 
 
Block * 
copyb(Block *bp, int count) 
{ 
	Block *nb, *head, **p; 
	int l; 
 
	p = &head; 
	while(count) { 
		l = BLEN(bp); 
		if(count < l) 
			l = count; 
		nb = allocb(l); 
		if(nb == 0) 
			panic("copyb.1"); 
		memmove(nb->wptr, bp->rptr, l); 
		nb->wptr += l; 
		count -= l; 
		if(bp->flags & S_DELIM) 
			nb->flags |= S_DELIM; 
		*p = nb; 
		p = &nb->next; 
		bp = bp->next; 
		if(bp == 0) 
			break; 
	} 
	if(count) { 
		nb = allocb(count); 
		if(nb == 0) 
			panic("copyb.2"); 
		memset(nb->wptr, 0, count); 
		nb->wptr += count; 
		nb->flags |= S_DELIM; 
		*p = nb; 
	} 
	if(blen(head) == 0) 
		print("copyb: zero length\n"); 
 
	return head; 
1990/0911    
} 
 
/* 
 *  Dump all block information of how many blocks are in which queues 
 */ 
void 
dumpblocks(Queue *q, char c) 
{ 
	Block *bp; 
	uchar *cp; 
 
	lock(q); 
	for(bp = q->first; bp; bp = bp->next){ 
1992/0319    
		print("%c %c%d%c", c, bp->type == M_DATA ? 'd' : 'c', 
			bp->wptr-bp->rptr, (bp->flags&S_DELIM)?'D':' '); 
		for(cp = bp->rptr; cp<bp->wptr && cp<bp->rptr+30; cp++) 
			print(" %.2x", *cp); 
1990/0911    
		print("\n"); 
	} 
	unlock(q); 
} 
 
void 
dumpqueues(void) 
{ 
	Queue *q; 
1991/0317    
	int count, qcount; 
1990/0911    
	Block *bp; 
	Bclass *bcp; 
1992/0326    
 
1990/0911    
	print("\n"); 
1991/0317    
	qcount = 0; 
1990/0911    
	for(q = qlist; q < qlist + conf.nqueue; q++, q++){ 
		if(!(q->flag & QINUSE)) 
			continue; 
1991/0317    
		qcount++; 
1992/0319    
		print("%10s %ux  R n %d l %d f %ux r %ux ", 
			q->info->name, q, 
1991/0323    
			q->nb, q->len, q->flag, &(q->r)); 
1992/0319    
		print("  W n %d l %d f %ux r %ux next %lux put %lux Rz %lux",  
1991/0320    
			WR(q)->nb, WR(q)->len, 
			WR(q)->flag, &(WR(q)->r), q->next, q->put, q->rp); 
1992/0319    
		print("\n"); 
1990/0911    
		dumpblocks(q, 'R'); 
		dumpblocks(WR(q), 'W'); 
	} 
1991/0317    
	print("%d queues\n", qcount); 
1990/0911    
	for(bcp=bclass; bcp<&bclass[Nclass]; bcp++){ 
		for(count = 0, bp = bcp->first; bp; count++, bp = bp->next) 
			; 
1991/0831    
		print("%d byte blocks: %d made %d free\n", bcp->size, 
			bcp->made, count); 
1990/0911    
	} 
	print("\n"); 
1990/0227    
} 


source code copyright © 1990-2005 Lucent Technologies; see license
Plan 9 distribution
comments to russ cox (rsc@swtch.com)