plan 9 kernel history: overview | file list | diff list

1999/0603/port/qio.c (diff list | history)

1999/0603/sys/src/9/port/qio.c:1,12171999/0714/sys/src/9/port/qio.c:1,1217 (short | long | prev | next)
Bug fix: called wrong function name.
rsc Fri Mar 4 12:44:25 2005
1993/0526    
#include	"u.h" 
#include	"../port/lib.h" 
#include	"mem.h" 
#include	"dat.h" 
#include	"fns.h" 
#include	"../port/error.h" 
 
1997/0327    
static ulong padblockcnt; 
static ulong concatblockcnt; 
static ulong pullupblockcnt; 
static ulong copyblockcnt; 
static ulong consumecnt; 
static ulong producecnt; 
static ulong qcopycnt; 
 
1998/0605    
static int debugging; 
1994/0323    
 
1997/1105    
#define QDEBUG	if(0) 
1994/0902    
 
1993/0526    
/* 
1993/0530    
 *  IO queues 
 */ 
typedef struct Queue	Queue; 
 
struct Queue 
{ 
	Lock; 
 
1995/0125    
	Block*	bfirst;		/* buffer */ 
1994/0321    
	Block*	blast; 
1993/0530    
 
1998/0922    
	int	len;		/* bytes allocated to queue */ 
	int	dlen;		/* data bytes in queue */ 
1993/0530    
	int	limit;		/* max bytes in queue */ 
1994/0902    
	int	inilim;		/* initial limit */ 
1993/0530    
	int	state; 
1994/0902    
	int	noblock;	/* true if writes return immediately when q full */ 
1993/0908    
	int	eof;		/* number of eofs read by user */ 
1993/0530    
 
	void	(*kick)(void*);	/* restart output */ 
1994/0321    
	void*	arg;		/* argument to kick */ 
1993/0530    
 
	QLock	rlock;		/* mutex for reading processes */ 
	Rendez	rr;		/* process waiting to read */ 
	QLock	wlock;		/* mutex for writing processes */ 
	Rendez	wr;		/* process waiting to write */ 
1997/0327    
 
	char	err[ERRLEN]; 
1993/0530    
}; 
 
enum 
{ 
1997/0327    
	/* Queue.state */ 
1994/0507    
	Qstarve		= (1<<0),	/* consumer starved */ 
	Qmsg		= (1<<1),	/* message stream */ 
	Qclosed		= (1<<2), 
	Qflow		= (1<<3), 
1997/0925    
 
1999/0219    
	Maxatomic	= 32*1024, 
1993/0530    
}; 
 
1993/0811    
void 
1997/0925    
ixsummary(void) 
{ 
1998/0605    
	debugging ^= 1; 
1999/0527    
	iallocsummary(); 
1997/0925    
	print("pad %lud, concat %lud, pullup %lud, copy %lud\n", 
		padblockcnt, concatblockcnt, pullupblockcnt, copyblockcnt); 
	print("consume %lud, produce %lud, qcopy %lud\n", 
		consumecnt, producecnt, qcopycnt); 
1993/0526    
} 
 
1997/0327    
/* 
 *  free a list of blocks 
 */ 
1993/0526    
void 
1997/0327    
freeblist(Block *b) 
{ 
	Block *next; 
 
	for(; b != 0; b = next){ 
		next = b->next; 
		b->next = 0; 
		freeb(b); 
	} 
} 
 
/* 
 *  pad a block to the front (or the back if size is negative) 
 */ 
Block* 
padblock(Block *bp, int size) 
{ 
	int n; 
	Block *nbp; 
 
1998/0605    
	QDEBUG checkb(bp, "padblock 1"); 
1997/0327    
	if(size >= 0){ 
		if(bp->rp - bp->base >= size){ 
			bp->rp -= size; 
			return bp; 
		} 
 
1998/0605    
		if(bp->next) 
1999/0501    
			panic("padblock 0x%uX", getcallerpc(&bp)); 
1997/0327    
		n = BLEN(bp); 
1997/0925    
		padblockcnt++; 
1997/0327    
		nbp = allocb(size+n); 
		nbp->rp += size; 
		nbp->wp = nbp->rp; 
		memmove(nbp->wp, bp->rp, n); 
		nbp->wp += n; 
		freeb(bp); 
		nbp->rp -= size; 
	} else { 
		size = -size; 
 
1999/0311    
		if(bp->next) 
1999/0501    
			panic("padblock 0x%uX", getcallerpc(&bp)); 
1999/0311    
 
1997/0327    
		if(bp->lim - bp->wp >= size) 
			return bp; 
 
		n = BLEN(bp); 
1997/0925    
		padblockcnt++; 
1997/0327    
		nbp = allocb(size+n); 
		memmove(nbp->wp, bp->rp, n); 
		nbp->wp += n; 
		freeb(bp); 
	} 
1998/0605    
	QDEBUG checkb(nbp, "padblock 1"); 
1997/0327    
	return nbp; 
} 
 
/* 
 *  return count of bytes in a string of blocks 
 */ 
int 
blocklen(Block *bp) 
{ 
	int len; 
 
	len = 0; 
	while(bp) { 
		len += BLEN(bp); 
		bp = bp->next; 
	} 
	return len; 
} 
 
/* 
 *  copy the  string of blocks into 
 *  a single block and free the string 
 */ 
Block* 
concatblock(Block *bp) 
{ 
	int len; 
	Block *nb, *f; 
 
	if(bp->next == 0) 
		return bp; 
 
	nb = allocb(blocklen(bp)); 
	for(f = bp; f; f = f->next) { 
		len = BLEN(f); 
		memmove(nb->wp, f->rp, len); 
		nb->wp += len; 
	} 
	concatblockcnt += BLEN(nb); 
	freeblist(bp); 
1998/0605    
	QDEBUG checkb(nb, "concatblock 1"); 
1997/0327    
	return nb; 
} 
 
/* 
 *  make sure the first block has at least n bytes 
 */ 
Block* 
pullupblock(Block *bp, int n) 
{ 
	int i; 
	Block *nbp; 
 
	/* 
	 *  this should almost always be true, it's 
	 *  just to avoid every caller checking. 
	 */ 
	if(BLEN(bp) >= n) 
		return bp; 
 
	/* 
	 *  if not enough room in the first block, 
	 *  add another to the front of the list. 
	 */ 
	if(bp->lim - bp->rp < n){ 
		nbp = allocb(n); 
		nbp->next = bp; 
		bp = nbp; 
	} 
 
	/* 
	 *  copy bytes from the trailing blocks into the first 
	 */ 
	n -= BLEN(bp); 
	while(nbp = bp->next){ 
		i = BLEN(nbp); 
		if(i > n) { 
			memmove(bp->wp, nbp->rp, n); 
1997/0925    
			pullupblockcnt++; 
1997/0327    
			bp->wp += n; 
			nbp->rp += n; 
1998/0605    
			QDEBUG checkb(bp, "pullupblock 1"); 
1997/0327    
			return bp; 
		} 
		else { 
			memmove(bp->wp, nbp->rp, i); 
1997/0925    
			pullupblockcnt++; 
1997/0327    
			bp->wp += i; 
			bp->next = nbp->next; 
			nbp->next = 0; 
			freeb(nbp); 
			n -= i; 
1998/0605    
			if(n == 0){ 
				QDEBUG checkb(bp, "pullupblock 2"); 
1997/0327    
				return bp; 
1998/0605    
			} 
1997/0327    
		} 
	} 
	freeb(bp); 
	return 0; 
} 
 
/* 
 *  trim to len bytes starting at offset 
 */ 
Block * 
trimblock(Block *bp, int offset, int len) 
{ 
	ulong l; 
	Block *nb, *startb; 
 
1998/0605    
	QDEBUG checkb(bp, "trimblock 1"); 
1997/0327    
	if(blocklen(bp) < offset+len) { 
		freeblist(bp); 
		return nil; 
	} 
 
	while((l = BLEN(bp)) < offset) { 
		offset -= l; 
		nb = bp->next; 
		bp->next = nil; 
		freeb(bp); 
		bp = nb; 
	} 
 
	startb = bp; 
	bp->rp += offset; 
 
	while((l = BLEN(bp)) < len) { 
		len -= l; 
		bp = bp->next; 
	} 
 
	bp->wp -= (BLEN(bp) - len); 
 
	if(bp->next) { 
		freeblist(bp->next); 
		bp->next = nil; 
	} 
 
	return startb; 
} 
 
/* 
 *  copy 'count' bytes into a new block 
 */ 
Block* 
copyblock(Block *bp, int count) 
{ 
	int l; 
	Block *nbp; 
 
1998/0605    
	QDEBUG checkb(bp, "copyblock 0"); 
1997/0327    
	nbp = allocb(count); 
	for(; count > 0 && bp != 0; bp = bp->next){ 
		l = BLEN(bp); 
		if(l > count) 
			l = count; 
		memmove(nbp->wp, bp->rp, l); 
		nbp->wp += l; 
		count -= l; 
	} 
	if(count > 0){ 
		memset(nbp->wp, 0, count); 
		nbp->wp += count; 
	} 
1997/0925    
	copyblockcnt++; 
1998/0605    
	QDEBUG checkb(nbp, "copyblock 1"); 
1997/0327    
 
	return nbp; 
} 
 
1997/0925    
Block* 
adjustblock(Block* bp, int len) 
{ 
	int n; 
1997/1007    
	Block *nbp; 
1997/0925    
 
	if(len < 0){ 
		freeb(bp); 
		return nil; 
	} 
 
1997/1007    
	if(bp->rp+len > bp->lim){ 
		nbp = copyblock(bp, len); 
		freeblist(bp); 
1998/0605    
		QDEBUG checkb(nbp, "adjustblock 1"); 
1997/1007    
 
		return nbp; 
	} 
1997/0925    
 
	n = BLEN(bp); 
	if(len > n) 
		memset(bp->wp, 0, len-n); 
	bp->wp = bp->rp+len; 
1998/0605    
	QDEBUG checkb(bp, "adjustblock 2"); 
1997/0925    
 
	return bp; 
} 
 
 
1997/0327    
/* 
 *  throw away up to count bytes from a 
 *  list of blocks.  Return count of bytes 
 *  thrown away. 
 */ 
int 
pullblock(Block **bph, int count) 
{ 
	Block *bp; 
	int n, bytes; 
 
	bytes = 0; 
	if(bph == nil) 
		return 0; 
 
	while(*bph != nil && count != 0) { 
		bp = *bph; 
		n = BLEN(bp); 
		if(count < n) 
			n = count; 
		bytes += n; 
		count -= n; 
		bp->rp += n; 
1998/0605    
		QDEBUG checkb(bp, "pullblock "); 
1997/0327    
		if(BLEN(bp) == 0) { 
			*bph = bp->next; 
			bp->next = nil; 
			freeb(bp); 
		} 
	} 
	return bytes; 
1995/0902    
} 
 
/* 
1997/0327    
 *  get next block from a queue, return null if nothing there 
1995/0902    
 */ 
Block* 
1997/0327    
qget(Queue *q) 
1995/0902    
{ 
1997/0327    
	int dowakeup; 
	Block *b; 
1995/0902    
 
1997/0327    
	/* sync with qwrite */ 
	ilock(q); 
1995/0902    
 
1997/0327    
	b = q->bfirst; 
	if(b == 0){ 
		q->state |= Qstarve; 
		iunlock(q); 
		return 0; 
	} 
	q->bfirst = b->next; 
	b->next = 0; 
1998/0918    
	q->len -= BALLOC(b); 
1998/0922    
	q->dlen -= BLEN(b); 
1998/0605    
	QDEBUG checkb(b, "qget"); 
1995/1121    
 
1997/0327    
	/* if writer flow controlled, restart */ 
	if((q->state & Qflow) && q->len < q->limit/2){ 
		q->state &= ~Qflow; 
		dowakeup = 1; 
	} else 
		dowakeup = 0; 
1995/1121    
 
1997/0327    
	iunlock(q); 
 
	if(dowakeup) 
		wakeup(&q->wr); 
 
	return b; 
} 
 
/* 
 *  throw away the next 'len' bytes in the queue 
 */ 
void 
qdiscard(Queue *q, int len) 
{ 
	Block *b; 
	int dowakeup, n, sofar; 
 
1997/0410    
	ilock(q); 
1997/0327    
	for(sofar = 0; sofar < len; sofar += n){ 
		b = q->bfirst; 
		if(b == nil) 
			break; 
1998/0605    
		QDEBUG checkb(b, "qdiscard"); 
1997/0327    
		n = BLEN(b); 
		if(n <= len - sofar){ 
			q->bfirst = b->next; 
			b->next = 0; 
1998/0918    
			q->len -= BALLOC(b); 
1998/0922    
			q->dlen -= BLEN(b); 
1997/0327    
			freeb(b); 
		} else { 
			n = len - sofar; 
			b->rp += n; 
1998/0922    
			q->dlen -= n; 
1997/0327    
		} 
1995/1121    
	} 
1997/0327    
 
	/* if writer flow controlled, restart */ 
	if((q->state & Qflow) && q->len < q->limit/2){ 
		q->state &= ~Qflow; 
		dowakeup = 1; 
	} else 
		dowakeup = 0; 
 
1997/0410    
	iunlock(q); 
1997/0327    
 
	if(dowakeup) 
		wakeup(&q->wr); 
1993/0526    
} 
 
/* 
1994/0322    
 *  Interrupt level copy out of a queue, return # bytes copied. 
1993/0526    
 */ 
int 
1993/0601    
qconsume(Queue *q, void *vp, int len) 
1993/0526    
{ 
	Block *b; 
1993/0528    
	int n, dowakeup; 
1993/0601    
	uchar *p = vp; 
1999/0115    
	Block *tofree = nil; 
1993/0526    
 
1993/0527    
	/* sync with qwrite */ 
1997/0410    
	ilock(q); 
1993/0527    
 
1994/1124    
	for(;;) { 
		b = q->bfirst; 
		if(b == 0){ 
			q->state |= Qstarve; 
1997/0410    
			iunlock(q); 
1994/1124    
			return -1; 
		} 
		QDEBUG checkb(b, "qconsume 1"); 
1993/0528    
 
1994/1124    
		n = BLEN(b); 
		if(n > 0) 
			break; 
		q->bfirst = b->next; 
1998/0918    
		q->len -= BALLOC(b); 
1999/0115    
 
		/* remember to free this */ 
		b->next = tofree; 
		tofree = b; 
1994/1124    
	}; 
 
1993/0526    
	if(n < len) 
		len = n; 
	memmove(p, b->rp, len); 
1997/0327    
	consumecnt += n; 
1993/0527    
	if((q->state & Qmsg) || len == n) 
1993/0526    
		q->bfirst = b->next; 
1994/0222    
	b->rp += len; 
1998/0922    
	q->dlen -= len; 
1993/0526    
 
1999/0115    
	/* discard the block if we're done with it */ 
	if((q->state & Qmsg) || len == n){ 
		b->next = 0; 
		q->len -= BALLOC(b); 
		q->dlen -= BLEN(b); 
 
		/* remember to free this */ 
		b->next = tofree; 
		tofree = b; 
	} 
 
1993/0528    
	/* if writer flow controlled, restart */ 
	if((q->state & Qflow) && q->len < q->limit/2){ 
		q->state &= ~Qflow; 
		dowakeup = 1; 
	} else 
		dowakeup = 0; 
1993/0526    
 
1997/0410    
	iunlock(q); 
1993/0526    
 
1993/0528    
	if(dowakeup) 
		wakeup(&q->wr); 
 
1999/0115    
	if(tofree != nil) 
		freeblist(tofree); 
 
1993/0526    
	return len; 
} 
 
1993/0528    
int 
1994/0311    
qpass(Queue *q, Block *b) 
{ 
1998/0922    
	int dlen, len, dowakeup; 
1994/0311    
 
	/* sync with qread */ 
	dowakeup = 0; 
1994/0323    
	ilock(q); 
1997/0404    
	if(q->len >= q->limit){ 
1998/0918    
		freeblist(b); 
1997/0404    
		iunlock(q); 
		return -1; 
	} 
1994/0311    
 
1997/0327    
	/* add buffer to queue */ 
1994/0311    
	if(q->bfirst) 
		q->blast->next = b; 
	else 
		q->bfirst = b; 
1998/0918    
	len = BALLOC(b); 
1998/0922    
	dlen = BLEN(b); 
1997/0327    
	QDEBUG checkb(b, "qpass"); 
	while(b->next){ 
		b = b->next; 
		QDEBUG checkb(b, "qpass"); 
1998/0918    
		len += BALLOC(b); 
1998/0922    
		dlen += BLEN(b); 
1997/0327    
	} 
1994/0311    
	q->blast = b; 
	q->len += len; 
1998/0922    
	q->dlen += dlen; 
1994/0311    
 
1994/0418    
	if(q->len >= q->limit/2) 
1994/0324    
		q->state |= Qflow; 
 
1994/0311    
	if(q->state & Qstarve){ 
		q->state &= ~Qstarve; 
		dowakeup = 1; 
	} 
1994/0323    
	iunlock(q); 
1994/0311    
 
1994/1117    
	if(dowakeup) 
1994/0311    
		wakeup(&q->rr); 
 
	return len; 
} 
 
int 
1998/0918    
qpassnolim(Queue *q, Block *b) 
{ 
1998/0922    
	int dlen, len, dowakeup; 
1998/0918    
 
	/* sync with qread */ 
	dowakeup = 0; 
	ilock(q); 
 
	/* add buffer to queue */ 
	if(q->bfirst) 
		q->blast->next = b; 
	else 
		q->bfirst = b; 
	len = BALLOC(b); 
1998/0922    
	dlen = BLEN(b); 
1998/0918    
	QDEBUG checkb(b, "qpass"); 
	while(b->next){ 
		b = b->next; 
		QDEBUG checkb(b, "qpass"); 
		len += BALLOC(b); 
1998/0922    
		dlen += BLEN(b); 
1998/0918    
	} 
	q->blast = b; 
	q->len += len; 
1998/0922    
	q->dlen += dlen; 
1998/0918    
 
	if(q->len >= q->limit/2) 
		q->state |= Qflow; 
 
	if(q->state & Qstarve){ 
		q->state &= ~Qstarve; 
		dowakeup = 1; 
	} 
	iunlock(q); 
 
	if(dowakeup) 
		wakeup(&q->rr); 
 
	return len; 
} 
 
/* 
 *  if the allocated space is way out of line with the used 
 *  space, reallocate to a smaller block 
 */ 
Block* 
packblock(Block *bp) 
{ 
1998/0923    
	Block **l, *nbp; 
	int n; 
1998/0918    
 
1998/0923    
	for(l = &bp; *l; l = &(*l)->next){ 
		nbp = *l; 
		n = BLEN(nbp); 
		if((n<<2) < BALLOC(nbp)){ 
			*l = allocb(n); 
			memmove((*l)->wp, nbp->rp, n); 
			(*l)->wp += n; 
			(*l)->next = nbp->next; 
			freeb(nbp); 
		} 
1998/0918    
	} 
 
	return bp; 
} 
 
int 
1993/0601    
qproduce(Queue *q, void *vp, int len) 
1993/0526    
{ 
	Block *b; 
1995/0714    
	int dowakeup; 
1993/0601    
	uchar *p = vp; 
1993/0526    
 
1993/0527    
	/* sync with qread */ 
1994/0208    
	dowakeup = 0; 
1997/0410    
	ilock(q); 
1993/0527    
 
1993/0526    
	/* no waiting receivers, room in buffer? */ 
	if(q->len >= q->limit){ 
1994/1116    
		q->state |= Qflow; 
1997/0410    
		iunlock(q); 
1993/0526    
		return -1; 
	} 
 
	/* save in buffer */ 
1994/0222    
	b = iallocb(len); 
	if(b == 0){ 
1997/0410    
		iunlock(q); 
1994/1117    
		return 0; 
1993/0526    
	} 
1994/0222    
	memmove(b->wp, p, len); 
1997/0327    
	producecnt += len; 
1994/0222    
	b->wp += len; 
	if(q->bfirst) 
		q->blast->next = b; 
	else 
		q->bfirst = b; 
	q->blast = b; 
1997/0327    
	/* b->next = 0; done by iallocb() */ 
1998/0918    
	q->len += BALLOC(b); 
1998/0922    
	q->dlen += BLEN(b); 
1994/0902    
	QDEBUG checkb(b, "qproduce"); 
1994/0222    
 
1993/0528    
	if(q->state & Qstarve){ 
		q->state &= ~Qstarve; 
		dowakeup = 1; 
1994/0208    
	} 
1999/0115    
 
	if(q->len >= q->limit) 
		q->state |= Qflow; 
1997/0410    
	iunlock(q); 
1993/0527    
 
1994/1117    
	if(dowakeup) 
1993/0528    
		wakeup(&q->rr); 
 
1993/0526    
	return len; 
} 
 
/* 
1997/0327    
 *  copy from offset in the queue 
 */ 
Block* 
qcopy(Queue *q, int len, ulong offset) 
{ 
	int sofar; 
	int n; 
	Block *b, *nb; 
	uchar *p; 
 
	nb = allocb(len); 
 
1997/0410    
	ilock(q); 
1997/0327    
 
	/* go to offset */ 
	b = q->bfirst; 
	for(sofar = 0; ; sofar += n){ 
		if(b == nil){ 
1997/0410    
			iunlock(q); 
1997/0327    
			return nb; 
		} 
		n = BLEN(b); 
		if(sofar + n > offset){ 
			p = b->rp + offset - sofar; 
			n -= offset - sofar; 
			break; 
		} 
1998/0605    
		QDEBUG checkb(b, "qcopy"); 
1997/0327    
		b = b->next; 
	} 
 
	/* copy bytes from there */ 
	for(sofar = 0; sofar < len;){ 
		if(n > len - sofar) 
			n = len - sofar; 
		memmove(nb->wp, p, n); 
		qcopycnt += n; 
		sofar += n; 
		nb->wp += n; 
		b = b->next; 
		if(b == nil) 
			break; 
		n = BLEN(b); 
		p = b->rp; 
	} 
1997/0410    
	iunlock(q); 
1997/0327    
 
	return nb; 
} 
 
/* 
1993/0526    
 *  called by non-interrupt code 
 */ 
Queue* 
1993/0530    
qopen(int limit, int msg, void (*kick)(void*), void *arg) 
1993/0526    
{ 
	Queue *q; 
 
	q = malloc(sizeof(Queue)); 
	if(q == 0) 
1993/0528    
		return 0; 
 
1998/0328    
	ilock(q); 
1994/0902    
	q->limit = q->inilim = limit; 
1993/0526    
	q->kick = kick; 
	q->arg = arg; 
1993/0530    
	q->state = msg ? Qmsg : 0; 
1993/0601    
	q->state |= Qstarve; 
1993/0908    
	q->eof = 0; 
1998/0703    
	q->noblock = 0; 
1998/0328    
	iunlock(q); 
1993/0526    
 
	return q; 
} 
 
static int 
1993/0528    
notempty(void *a) 
1993/0526    
{ 
1993/0528    
	Queue *q = a; 
1993/0526    
 
1994/0323    
	return (q->state & Qclosed) || q->bfirst != 0; 
1993/0526    
} 
 
1993/0528    
/* 
1995/0714    
 *  get next block from a queue (up to a limit) 
1993/0528    
 */ 
1995/0714    
Block* 
qbread(Queue *q, int len) 
1993/0526    
{ 
1995/0714    
	Block *b, *nb; 
1994/1116    
	int n, dowakeup; 
1993/0526    
 
	qlock(&q->rlock); 
1993/0527    
	if(waserror()){ 
		qunlock(&q->rlock); 
		nexterror(); 
	} 
1993/0526    
 
1993/0528    
	/* wait for data */ 
	for(;;){ 
		/* sync with qwrite/qproduce */ 
1994/0323    
		ilock(q); 
1993/0526    
 
1993/0728    
		b = q->bfirst; 
1997/0327    
		if(b){ 
			QDEBUG checkb(b, "qbread 0"); 
1993/0728    
			break; 
1997/0327    
		} 
1993/0728    
 
1993/0528    
		if(q->state & Qclosed){ 
1994/0323    
			iunlock(q); 
1993/0908    
			poperror(); 
			qunlock(&q->rlock); 
			if(++q->eof > 3) 
1997/0327    
				error(q->err); 
1993/0528    
			return 0; 
1993/0527    
		} 
 
1995/0714    
		q->state |= Qstarve;	/* flag requesting producer to wake me */ 
		iunlock(q); 
		sleep(&q->rr, notempty, q); 
1993/0526    
	} 
 
1993/0528    
	/* remove a buffered block */ 
	q->bfirst = b->next; 
1997/0327    
	b->next = 0; 
1993/0528    
	n = BLEN(b); 
1998/0922    
	q->dlen -= n; 
1998/0918    
	q->len -= BALLOC(b); 
1997/0327    
	QDEBUG checkb(b, "qbread 1"); 
1993/0528    
 
1997/1102    
	/* split block if it's too big and this is not a message-oriented queue */ 
1995/0714    
	nb = b; 
	if(n > len){ 
		if((q->state&Qmsg) == 0){ 
1995/0917    
			iunlock(q); 
 
1995/0714    
			n -= len; 
			b = allocb(n); 
			memmove(b->wp, nb->rp+len, n); 
			b->wp += n; 
1993/0526    
 
1995/0917    
			ilock(q); 
1995/0714    
			b->next = q->bfirst; 
			if(q->bfirst == 0) 
				q->blast = b; 
			q->bfirst = b; 
1998/0918    
			q->len += BALLOC(b); 
1998/0922    
			q->dlen += n; 
1995/0714    
		} 
		nb->wp = nb->rp + len; 
1993/0526    
	} 
1999/0115    
 
	/* if writer flow controlled, restart */ 
	if((q->state & Qflow) && q->len < q->limit/2){ 
		q->state &= ~Qflow; 
		dowakeup = 1; 
	} else 
		dowakeup = 0; 
1993/0527    
 
1995/0714    
	iunlock(q); 
 
	/* wakeup flow controlled writers */ 
1994/0324    
	if(dowakeup){ 
		if(q->kick) 
1997/0327    
			q->kick(q->arg); 
1993/0528    
		wakeup(&q->wr); 
1994/0324    
	} 
1993/0528    
 
1993/0527    
	poperror(); 
1993/0526    
	qunlock(&q->rlock); 
1995/0714    
	return nb; 
1993/0526    
} 
 
1995/0714    
/* 
 *  read a queue.  if no data is queued, post a Block 
 *  and wait on its Rendez. 
 */ 
long 
qread(Queue *q, void *vp, int len) 
{ 
	Block *b; 
 
	b = qbread(q, len); 
	if(b == 0) 
		return 0; 
 
	len = BLEN(b); 
	memmove(vp, b->rp, len); 
	freeb(b); 
	return len; 
} 
 
1993/0528    
static int 
qnotfull(void *a) 
1993/0526    
{ 
1993/0528    
	Queue *q = a; 
1993/0526    
 
1994/0215    
	return q->len < q->limit || (q->state & Qclosed); 
1993/0528    
} 
1993/0527    
 
1993/0528    
/* 
1995/0714    
 *  add a block to a queue obeying flow control 
1993/0528    
 */ 
long 
1995/0714    
qbwrite(Queue *q, Block *b) 
1993/0528    
{ 
1995/0714    
	int n, dowakeup; 
1993/0526    
 
1994/0208    
	dowakeup = 0; 
1995/0714    
	n = BLEN(b); 
1998/0328    
	qlock(&q->wlock); 
1994/0208    
	if(waserror()){ 
		qunlock(&q->wlock); 
		nexterror(); 
1994/0804    
	} 
1994/0208    
 
1995/0714    
	/* flow control */ 
1998/0328    
	for(;;){ 
		ilock(q); 
 
		if(q->state & Qclosed){ 
			iunlock(q); 
			freeb(b); 
			error(q->err); 
		} 
 
		if(q->len < q->limit) 
			break; 
 
1995/0714    
		if(q->noblock){ 
1998/0328    
			iunlock(q); 
1995/0714    
			freeb(b); 
1993/1227    
			qunlock(&q->wlock); 
1994/0208    
			poperror(); 
1995/0714    
			return n; 
1993/1227    
		} 
1998/0328    
 
1995/0714    
		q->state |= Qflow; 
1998/0328    
		iunlock(q); 
1995/0714    
		sleep(&q->wr, qnotfull, q); 
1993/0526    
	} 
 
1995/0714    
	if(q->bfirst) 
		q->blast->next = b; 
	else 
		q->bfirst = b; 
	q->blast = b; 
1997/0327    
	b->next = 0; 
1998/0918    
	q->len += BALLOC(b); 
1998/0922    
	q->dlen += n; 
1998/0605    
	QDEBUG checkb(b, "qbwrite"); 
1995/0714    
 
	if(q->state & Qstarve){ 
		q->state &= ~Qstarve; 
		dowakeup = 1; 
	} 
 
	iunlock(q); 
 
	if(dowakeup){ 
		if(q->kick) 
1997/0327    
			q->kick(q->arg); 
1995/0714    
		wakeup(&q->rr); 
	} 
 
	qunlock(&q->wlock); 
	poperror(); 
	return n; 
} 
 
/* 
1999/0219    
 *  write to a queue.  only Maxatomic bytes at a time is atomic. 
1995/0714    
 */ 
1997/0327    
int 
1995/0714    
qwrite(Queue *q, void *vp, int len) 
{ 
	int n, sofar; 
	Block *b; 
	uchar *p = vp; 
 
1997/0327    
	QDEBUG if(islo() == 0) 
1999/0501    
		print("qwrite hi %lux\n", getcallerpc(&q)); 
1995/0714    
 
	sofar = 0; 
1994/0208    
	do { 
		n = len-sofar; 
1999/0219    
		if(n > Maxatomic) 
			n = Maxatomic; 
1993/0528    
 
1994/0208    
		b = allocb(n); 
1999/0603    
		tagwithpc(b, (up->text[0]<<24)|(up->text[1]<<16)|(up->text[2]<<8)|up->text[3]); 
1999/0714    
		setmalloctag(b, (up->text[0]<<24)|(up->text[1]<<16)|(up->text[2]<<8)|up->text[3]); 
1994/0804    
		if(waserror()){ 
			freeb(b); 
			nexterror(); 
		} 
1994/0208    
		memmove(b->wp, p+sofar, n); 
1995/0714    
		poperror(); 
1994/0208    
		b->wp += n; 
1994/0929    
 
1995/0714    
		qbwrite(q, b); 
1994/0929    
 
1994/0208    
		sofar += n; 
	} while(sofar < len && (q->state & Qmsg) == 0); 
1993/0528    
 
1993/0526    
	return len; 
} 
 
1993/0528    
/* 
1995/0714    
 *  used by print() to write to a queue.  Since we may be splhi or not in 
 *  a process, don't qlock. 
1994/1124    
 */ 
1997/0327    
int 
1994/1124    
qiwrite(Queue *q, void *vp, int len) 
{ 
	int n, sofar, dowakeup; 
	Block *b; 
	uchar *p = vp; 
 
	dowakeup = 0; 
 
	sofar = 0; 
	do { 
		n = len-sofar; 
1999/0219    
		if(n > Maxatomic) 
			n = Maxatomic; 
1994/1124    
 
		b = allocb(n); 
		memmove(b->wp, p+sofar, n); 
		b->wp += n; 
 
		ilock(q); 
 
		QDEBUG checkb(b, "qiwrite"); 
1995/0714    
		if(q->bfirst) 
			q->blast->next = b; 
		else 
			q->bfirst = b; 
		q->blast = b; 
1998/0918    
		q->len += BALLOC(b); 
1998/0922    
		q->dlen += n; 
1994/1124    
 
1995/0714    
		if(q->state & Qstarve){ 
			q->state &= ~Qstarve; 
			dowakeup = 1; 
1994/1124    
		} 
 
		iunlock(q); 
 
		if(dowakeup){ 
			if(q->kick) 
1997/0327    
				q->kick(q->arg); 
1994/1124    
			wakeup(&q->rr); 
		} 
 
		sofar += n; 
	} while(sofar < len && (q->state & Qmsg) == 0); 
 
	return len; 
} 
 
/* 
1997/0327    
 *  be extremely careful when calling this, 
 *  as there is no reference accounting 
 */ 
void 
qfree(Queue *q) 
{ 
	qclose(q); 
	free(q); 
} 
 
/* 
1993/0528    
 *  Mark a queue as closed.  No further IO is permitted. 
 *  All blocks are released. 
 */ 
void 
qclose(Queue *q) 
1993/0527    
{ 
1997/0327    
	Block *bfirst; 
1993/0527    
 
1997/0327    
	if(q == nil) 
		return; 
 
1993/0528    
	/* mark it */ 
1994/0323    
	ilock(q); 
1993/0528    
	q->state |= Qclosed; 
1997/0806    
	q->state &= ~(Qflow|Qstarve); 
1997/0327    
	strcpy(q->err, Ehungup); 
1993/0528    
	bfirst = q->bfirst; 
	q->bfirst = 0; 
1994/0219    
	q->len = 0; 
1998/0929    
	q->dlen = 0; 
1994/0927    
	q->noblock = 0; 
1994/0323    
	iunlock(q); 
1993/0527    
 
1993/0528    
	/* free queued blocks */ 
1997/0327    
	freeblist(bfirst); 
1993/0526    
 
1993/0528    
	/* wake up readers/writers */ 
	wakeup(&q->rr); 
	wakeup(&q->wr); 
} 
1993/0527    
 
1993/0528    
/* 
 *  Mark a queue as closed.  Wakeup any readers.  Don't remove queued 
 *  blocks. 
 */ 
void 
1997/0327    
qhangup(Queue *q, char *msg) 
1993/0528    
{ 
	/* mark it */ 
1994/0323    
	ilock(q); 
1993/0528    
	q->state |= Qclosed; 
1997/0327    
	if(msg == 0 || *msg == 0) 
		strcpy(q->err, Ehungup); 
	else 
		strncpy(q->err, msg, ERRLEN-1); 
1994/0323    
	iunlock(q); 
1993/0527    
 
1993/0528    
	/* wake up readers/writers */ 
	wakeup(&q->rr); 
	wakeup(&q->wr); 
1998/1127    
} 
 
/* 
 *  return non-zero if the q is hungup 
 */ 
int 
qisclosed(Queue *q) 
{ 
	return q->state & Qclosed; 
1993/0528    
} 
 
/* 
 *  mark a queue as no longer hung up 
 */ 
void 
qreopen(Queue *q) 
{ 
1998/0328    
	ilock(q); 
1993/0528    
	q->state &= ~Qclosed; 
1993/0601    
	q->state |= Qstarve; 
1993/0908    
	q->eof = 0; 
1994/0902    
	q->limit = q->inilim; 
1998/0328    
	iunlock(q); 
1993/0530    
} 
 
/* 
 *  return bytes queued 
 */ 
int 
qlen(Queue *q) 
{ 
1998/0922    
	return q->dlen; 
1993/0601    
} 
 
/* 
1994/0327    
 * return space remaining before flow control 
 */ 
int 
qwindow(Queue *q) 
{ 
	int l; 
 
	l = q->limit - q->len; 
	if(l < 0) 
		l = 0; 
	return l; 
} 
 
/* 
1993/0601    
 *  return true if we can read without blocking 
 */ 
int 
qcanread(Queue *q) 
{ 
	return q->bfirst!=0; 
1994/0902    
} 
 
/* 
 *  change queue limit 
 */ 
void 
qsetlimit(Queue *q, int limit) 
{ 
	q->limit = limit; 
} 
 
/* 
 *  set blocking/nonblocking 
 */ 
void 
qnoblock(Queue *q, int onoff) 
{ 
	q->noblock = onoff; 
1994/0927    
} 
 
/* 
 *  flush the output queue 
 */ 
void 
qflush(Queue *q) 
{ 
1997/0327    
	Block *bfirst; 
1994/0927    
 
	/* mark it */ 
	ilock(q); 
	bfirst = q->bfirst; 
	q->bfirst = 0; 
	q->len = 0; 
1998/0922    
	q->dlen = 0; 
1994/0927    
	iunlock(q); 
 
	/* free queued blocks */ 
1997/0327    
	freeblist(bfirst); 
1994/0927    
 
	/* wake up readers/writers */ 
	wakeup(&q->wr); 
1994/1124    
} 
 
int 
1997/0327    
qfull(Queue *q) 
1994/1124    
{ 
1997/0327    
	return q->state & Qflow; 
1993/0526    
} 
1995/1217    
 
1997/0327    
int 
qstate(Queue *q) 
1995/1217    
{ 
1997/0327    
	return q->state; 
1995/1217    
} 


source code copyright © 1990-2005 Lucent Technologies; see license
Plan 9 distribution
comments to russ cox (rsc@swtch.com)