plan 9 kernel history: overview | file list | diff list

1995/0714/port/qio.c (diff list | history)

1995/0125/sys/src/9/port/qio.c:41,491995/0714/sys/src/9/port/qio.c:41,46 (short | long | prev | next)
1993/0530    
	Rendez	rr;		/* process waiting to read */ 
	QLock	wlock;		/* mutex for writing processes */ 
	Rendez	wr;		/* process waiting to write */ 
1994/0208    
                 
1994/0321    
	uchar*	syncbuf;	/* synchronous IO buffer */ 
1994/0208    
	int	synclen;	/* syncbuf length */ 
1993/0530    
}; 
 
enum 
1995/0125/sys/src/9/port/qio.c:224,2301995/0714/sys/src/9/port/qio.c:221,227
1993/0528    
int 
1994/0311    
qpass(Queue *q, Block *b) 
{ 
1994/0323    
	int i, len, dowakeup; 
1995/0714    
	int len, dowakeup; 
1994/0311    
 
	len = BLEN(b); 
	/* sync with qread */ 
1995/0125/sys/src/9/port/qio.c:231,2551995/0714/sys/src/9/port/qio.c:228,233
1994/0311    
	dowakeup = 0; 
1994/0323    
	ilock(q); 
1994/0311    
 
	if(q->syncbuf){ 
		/* synchronous communications, just copy into buffer */ 
		if(len < q->synclen) 
			q->synclen = len; 
		i = q->synclen; 
		memmove(q->syncbuf, b->rp, i); 
		q->syncbuf = 0;		/* tell reader buffer is full */ 
		len -= i; 
1994/0324    
		if(len <= 0 || (q->state & Qmsg)) { 
1994/0323    
			iunlock(q); 
1994/0311    
			wakeup(&q->rr); 
1994/0322    
			freeb(b); 
1994/0311    
			return i; 
		} 
		dowakeup = 1; 
1994/0324    
		/* queue anything that's left */ 
1994/0311    
		b->rp += i; 
	} 
                 
	/* save in buffer */ 
	if(q->bfirst) 
		q->blast->next = b; 
1995/0125/sys/src/9/port/qio.c:278,2841995/0714/sys/src/9/port/qio.c:256,262
1993/0601    
qproduce(Queue *q, void *vp, int len) 
1993/0526    
{ 
	Block *b; 
1994/0208    
	int i, dowakeup; 
1995/0714    
	int dowakeup; 
1993/0601    
	uchar *p = vp; 
1993/0526    
 
1993/0527    
	/* sync with qread */ 
1995/0125/sys/src/9/port/qio.c:285,3091995/0714/sys/src/9/port/qio.c:263,268
1994/0208    
	dowakeup = 0; 
1993/0526    
	lock(q); 
1993/0527    
 
1994/0208    
	if(q->syncbuf){ 
		/* synchronous communications, just copy into buffer */ 
		if(len < q->synclen) 
			q->synclen = len; 
		i = q->synclen; 
		memmove(q->syncbuf, p, i); 
		q->syncbuf = 0;		/* tell reader buffer is full */ 
		len -= i; 
		if(len <= 0 || (q->state & Qmsg)){ 
			unlock(q); 
			wakeup(&q->rr); 
			return i; 
		} 
                 
		/* queue anything that's left */ 
		dowakeup = 1; 
		p += i; 
	} 
                 
1993/0526    
	/* no waiting receivers, room in buffer? */ 
	if(q->len >= q->limit){ 
1994/1116    
		q->state |= Qflow; 
1995/0125/sys/src/9/port/qio.c:362,3751995/0714/sys/src/9/port/qio.c:321,326
1993/0526    
} 
 
static int 
1994/0208    
filled(void *a) 
{ 
	Queue *q = a; 
                 
	return q->syncbuf == 0; 
} 
                 
static int 
1993/0528    
notempty(void *a) 
1993/0526    
{ 
1993/0528    
	Queue *q = a; 
1995/0125/sys/src/9/port/qio.c:378,3921995/0714/sys/src/9/port/qio.c:329,341
1993/0526    
} 
 
1993/0528    
/* 
 *  read a queue.  if no data is queued, post a Block 
 *  and wait on its Rendez. 
1995/0714    
 *  get next block from a queue (up to a limit) 
1993/0528    
 */ 
1993/0526    
long 
1993/0601    
qread(Queue *q, void *vp, int len) 
1995/0714    
Block* 
qbread(Queue *q, int len) 
1993/0526    
{ 
1993/0528    
	Block *b; 
1995/0714    
	Block *b, *nb; 
1994/1116    
	int n, dowakeup; 
1993/0601    
	uchar *p = vp; 
1993/0526    
 
	qlock(&q->rlock); 
1993/0527    
	if(waserror()){ 
1995/0125/sys/src/9/port/qio.c:412,4551995/0714/sys/src/9/port/qio.c:361,370
1993/0528    
			return 0; 
1993/0527    
		} 
 
1994/0208    
		if(globalmem(vp)){ 
			/* just let the writer fill the buffer directly */ 
			q->synclen = len; 
			q->syncbuf = vp; 
1994/0323    
			iunlock(q); 
1994/1116    
			if(waserror()){ 
				/* sync with qwrite() & qproduce() */ 
				qlock(&q->wlock); 
				ilock(q); 
				if(q->syncbuf == 0){ 
					/* we got some data before the interrupt */ 
					b = allocb(q->synclen); 
					memmove(b->wp, vp, q->synclen); 
					b->wp += q->synclen; 
					b->next = q->bfirst; 
					if(q->bfirst == 0) 
						q->blast = b; 
					q->bfirst = b; 
					q->len += q->synclen; 
				} 
				q->syncbuf = 0; 
				iunlock(q); 
				qunlock(&q->wlock); 
				nexterror(); 
			} 
1994/0331    
			sleep(&q->rr, filled, q); 
1994/1116    
			poperror(); 
1994/0208    
			len = q->synclen; 
			qunlock(&q->rlock); 
1994/1026    
			poperror(); 
1994/0208    
			return len; 
		} else { 
			q->state |= Qstarve; 
1994/0323    
			iunlock(q); 
			sleep(&q->rr, notempty, q); 
1994/0208    
		} 
1995/0714    
		q->state |= Qstarve;	/* flag requesting producer to wake me */ 
		iunlock(q); 
		sleep(&q->rr, notempty, q); 
1993/0526    
	} 
1994/0902    
	QDEBUG checkb(b, "qread 1"); 
1993/0526    
 
1993/0528    
	/* remove a buffered block */ 
	q->bfirst = b->next; 
1995/0125/sys/src/9/port/qio.c:462,4911995/0714/sys/src/9/port/qio.c:377,404
1993/0528    
		dowakeup = 1; 
	} else 
		dowakeup = 0; 
1994/0323    
	iunlock(q); 
1993/0527    
 
	/* do this outside of the lock(q)! */ 
1993/0528    
	if(n > len) 
		n = len; 
	memmove(p, b->rp, n); 
	b->rp += n; 
1995/0714    
	/* split block if its too big and this is not a message oriented queue */ 
	nb = b; 
	if(n > len){ 
		if((q->state&Qmsg) == 0){ 
			n -= len; 
			b = allocb(n); 
			memmove(b->wp, nb->rp+len, n); 
			b->wp += n; 
1993/0526    
 
1994/0902    
	QDEBUG checkb(b, "qread 2"); 
1994/1116    
                 
1993/0725    
	/* free it or put what's left on the queue */ 
1993/0811    
	if(b->rp >= b->wp || (q->state&Qmsg)) { 
1994/0322    
		freeb(b); 
1994/0208    
	} else { 
1994/0323    
		ilock(q); 
1993/0528    
		b->next = q->bfirst; 
1994/1116    
		if(q->bfirst == 0) 
			q->blast = b; 
1993/0528    
		q->bfirst = b; 
		q->len += BLEN(b); 
1994/0323    
		iunlock(q); 
1995/0714    
			b->next = q->bfirst; 
			if(q->bfirst == 0) 
				q->blast = b; 
			q->bfirst = b; 
			q->len += n; 
		} 
		nb->wp = nb->rp + len; 
1993/0526    
	} 
1993/0527    
 
1995/0125    
	/* wakeup flow controlled writers (with a bit of histeria) */ 
1995/0714    
	iunlock(q); 
 
	/* wakeup flow controlled writers */ 
1994/0324    
	if(dowakeup){ 
		if(q->kick) 
			(*q->kick)(q->arg); 
1995/0125/sys/src/9/port/qio.c:494,5021995/0714/sys/src/9/port/qio.c:407,434
1993/0528    
 
1993/0527    
	poperror(); 
1993/0526    
	qunlock(&q->rlock); 
	return n; 
1995/0714    
	return nb; 
1993/0526    
} 
 
1995/0714    
/* 
 *  read a queue.  if no data is queued, post a Block 
 *  and wait on its Rendez. 
 */ 
long 
qread(Queue *q, void *vp, int len) 
{ 
	Block *b; 
 
	b = qbread(q, len); 
	if(b == 0) 
		return 0; 
 
	len = BLEN(b); 
	memmove(vp, b->rp, len); 
	freeb(b); 
	return len; 
} 
 
1993/0528    
static int 
qnotfull(void *a) 
1993/0526    
{ 
1995/0125/sys/src/9/port/qio.c:506,5281995/0714/sys/src/9/port/qio.c:438,453
1993/0528    
} 
1993/0527    
 
1993/0528    
/* 
 *  write to a queue.  if no reader blocks are posted 
 *  queue the data. 
1994/0208    
 * 
1994/0323    
 *  all copies should be outside of ilock since they can fault. 
1995/0714    
 *  add a block to a queue obeying flow control 
1993/0528    
 */ 
long 
1994/0902    
qwrite(Queue *q, void *vp, int len) 
1995/0714    
qbwrite(Queue *q, Block *b) 
1993/0528    
{ 
1994/0323    
	int n, sofar, dowakeup; 
1993/0528    
	Block *b; 
1993/0601    
	uchar *p = vp; 
1995/0714    
	int n, dowakeup; 
1993/0526    
 
1994/0208    
	dowakeup = 0; 
1995/0714    
	n = BLEN(b); 
1993/0526    
 
1995/0108    
if((getstatus()&IE) == 0) 
print("qwrite hi %lux\n", getcallerpc(q)); 
1995/0104    
                 
1994/0208    
	if(waserror()){ 
		qunlock(&q->wlock); 
		nexterror(); 
1995/0125/sys/src/9/port/qio.c:529,5531995/0714/sys/src/9/port/qio.c:454,517
1994/0804    
	} 
1994/0208    
	qlock(&q->wlock); 
 
	sofar = 0; 
	if(q->syncbuf){ 
		if(len < q->synclen) 
			sofar = len; 
		else 
			sofar = q->synclen; 
                 
		memmove(q->syncbuf, p, sofar); 
		q->synclen = sofar; 
		q->syncbuf = 0; 
		wakeup(&q->rr); 
                 
		if(len == sofar || (q->state & Qmsg)){ 
1995/0714    
	/* flow control */ 
	while(!qnotfull(q)){ 
		if(q->noblock){ 
			freeb(b); 
1993/1227    
			qunlock(&q->wlock); 
1994/0208    
			poperror(); 
			return len; 
1995/0714    
			return n; 
1993/1227    
		} 
1995/0714    
		q->state |= Qflow; 
		sleep(&q->wr, qnotfull, q); 
1993/0526    
	} 
 
1995/0714    
	ilock(q); 
 
	if(q->state & Qclosed){ 
		iunlock(q); 
		error(Ehungup); 
	} 
 
	if(q->bfirst) 
		q->blast->next = b; 
	else 
		q->bfirst = b; 
	q->blast = b; 
	q->len += n; 
 
	if(q->state & Qstarve){ 
		q->state &= ~Qstarve; 
		dowakeup = 1; 
	} 
 
	iunlock(q); 
 
	if(dowakeup){ 
		if(q->kick) 
			(*q->kick)(q->arg); 
		wakeup(&q->rr); 
	} 
 
	qunlock(&q->wlock); 
	poperror(); 
	return n; 
} 
 
/* 
 *  write to a queue.  only 128k at a time is atomic. 
 */ 
long 
qwrite(Queue *q, void *vp, int len) 
{ 
	int n, sofar; 
	Block *b; 
	uchar *p = vp; 
 
	QDEBUG if((getstatus()&IE) == 0) 
		print("qwrite hi %lux\n", getcallerpc(q)); 
 
	sofar = 0; 
1994/0208    
	do { 
		n = len-sofar; 
		if(n > 128*1024) 
1995/0125/sys/src/9/port/qio.c:559,6301995/0714/sys/src/9/port/qio.c:523,542
1994/0804    
			nexterror(); 
		} 
1994/0208    
		memmove(b->wp, p+sofar, n); 
1995/0714    
		poperror(); 
1994/0208    
		b->wp += n; 
1994/0929    
 
1994/0208    
		/* flow control */ 
		while(!qnotfull(q)){ 
1994/0902    
			if(q->noblock){ 
1994/0322    
				freeb(b); 
1994/0208    
				qunlock(&q->wlock); 
				poperror(); 
1994/0804    
				poperror(); 
1994/0208    
				return len; 
			} 
			q->state |= Qflow; 
			sleep(&q->wr, qnotfull, q); 
		} 
1995/0714    
		qbwrite(q, b); 
1994/0929    
 
1994/0323    
		ilock(q); 
1994/0929    
                 
1994/0208    
		if(q->state & Qclosed){ 
1994/0323    
			iunlock(q); 
1994/0208    
			error(Ehungup); 
		} 
1994/0804    
		poperror(); 
1994/0222    
                 
1994/0902    
		QDEBUG checkb(b, "qwrite"); 
1994/0208    
		if(q->syncbuf){ 
			/* we guessed wrong and did an extra copy */ 
			if(n > q->synclen) 
				n = q->synclen; 
			memmove(q->syncbuf, b->rp, n); 
			q->synclen = n; 
			q->syncbuf = 0; 
			dowakeup = 1; 
1994/0322    
			freeb(b); 
1994/0208    
		} else { 
			/* we guessed right, queue it */ 
			if(q->bfirst) 
				q->blast->next = b; 
			else 
				q->bfirst = b; 
			q->blast = b; 
			q->len += n; 
1994/0929    
                 
1994/0208    
			if(q->state & Qstarve){ 
				q->state &= ~Qstarve; 
				dowakeup = 1; 
			} 
		} 
                 
1994/0323    
		iunlock(q); 
1993/0526    
                 
1994/0208    
		if(dowakeup){ 
			if(q->kick) 
				(*q->kick)(q->arg); 
			wakeup(&q->rr); 
		} 
1993/0528    
                 
1994/0208    
		sofar += n; 
	} while(sofar < len && (q->state & Qmsg) == 0); 
1993/0528    
 
1994/0208    
	qunlock(&q->wlock); 
	poperror(); 
1993/0526    
	return len; 
} 
 
1993/0528    
/* 
1994/1124    
 *  used by print() to write to a queue 
1995/0714    
 *  used by print() to write to a queue.  Since we may be splhi or not in 
 *  a process, don't qlock. 
1994/1124    
 */ 
int 
qiwrite(Queue *q, void *vp, int len) 
1995/0125/sys/src/9/port/qio.c:648,6751995/0714/sys/src/9/port/qio.c:560,575
1994/1124    
		ilock(q); 
 
		QDEBUG checkb(b, "qiwrite"); 
		if(q->syncbuf){ 
			/* we guessed wrong and did an extra copy */ 
			if(n > q->synclen) 
				n = q->synclen; 
			memmove(q->syncbuf, b->rp, n); 
			q->synclen = n; 
			q->syncbuf = 0; 
			dowakeup = 1; 
			freeb(b); 
		} else { 
			/* we guessed right, queue it */ 
			if(q->bfirst) 
				q->blast->next = b; 
			else 
				q->bfirst = b; 
			q->blast = b; 
			q->len += n; 
1995/0714    
		if(q->bfirst) 
			q->blast->next = b; 
		else 
			q->bfirst = b; 
		q->blast = b; 
		q->len += n; 
1994/1124    
 
			if(q->state & Qstarve){ 
				q->state &= ~Qstarve; 
				dowakeup = 1; 
			} 
1995/0714    
		if(q->state & Qstarve){ 
			q->state &= ~Qstarve; 
			dowakeup = 1; 
1994/1124    
		} 
 
		iunlock(q); 


source code copyright © 1990-2005 Lucent Technologies; see license
Plan 9 distribution
comments to russ cox (rsc@swtch.com)