plan 9 kernel history: overview | file list | diff list

2001/0128/port/qio.c (diff list | history)

2001/0127/sys/src/9/port/qio.c:773,7972001/0128/sys/src/9/port/qio.c:773,788 (short | long | prev | next)
1993/0526    
} 
 
1993/0528    
/* 
1995/0714    
 *  get next block from a queue (up to a limit) 
2001/0128    
 *  wait for the queue to be non-empty or closed. 
 *  called with q ilocked. 
1993/0528    
 */ 
1995/0714    
Block* 
qbread(Queue *q, int len) 
2001/0128    
static int 
qwait(Queue *q) 
1993/0526    
{ 
1995/0714    
	Block *b, *nb; 
1994/1116    
	int n, dowakeup; 
2001/0128    
	Block *b; 
1993/0526    
 
	qlock(&q->rlock); 
1993/0527    
	if(waserror()){ 
		qunlock(&q->rlock); 
		nexterror(); 
	} 
1993/0526    
                 
1993/0528    
	/* wait for data */ 
	for(;;){ 
		/* sync with qwrite/qproduce */ 
1994/0323    
		ilock(q); 
1993/0526    
                 
1993/0728    
		b = q->bfirst; 
1997/0327    
		if(b){ 
			QDEBUG checkb(b, "qbread 0"); 
2001/0127/sys/src/9/port/qio.c:799,8092001/0128/sys/src/9/port/qio.c:790,797
1997/0327    
		} 
1993/0728    
 
1993/0528    
		if(q->state & Qclosed){ 
1994/0323    
			iunlock(q); 
1993/0908    
			poperror(); 
			qunlock(&q->rlock); 
			if(++q->eof > 3) 
1997/0327    
				error(q->err); 
2001/0128    
				return -1; 
1993/0528    
			return 0; 
1993/0527    
		} 
 
2001/0127/sys/src/9/port/qio.c:810,8532001/0128/sys/src/9/port/qio.c:798,860
1995/0714    
		q->state |= Qstarve;	/* flag requesting producer to wake me */ 
		iunlock(q); 
		sleep(&q->rr, notempty, q); 
2001/0128    
		ilock(q); 
1993/0526    
	} 
2001/0128    
	return 1; 
} 
1993/0526    
 
1993/0528    
	/* remove a buffered block */ 
2001/0128    
/* 
 *  called with q ilocked 
 */ 
static Block* 
qremove(Queue *q) 
{ 
	Block *b; 
 
	b = q->bfirst; 
	if(b == nil) 
		return nil; 
1993/0528    
	q->bfirst = b->next; 
1997/0327    
	b->next = 0; 
1993/0528    
	n = BLEN(b); 
1998/0922    
	q->dlen -= n; 
2001/0128    
	b->next = nil; 
	q->dlen -= BLEN(b); 
1998/0918    
	q->len -= BALLOC(b); 
1997/0327    
	QDEBUG checkb(b, "qbread 1"); 
2001/0128    
	QDEBUG checkb(b, "qremove"); 
	return b; 
} 
1993/0528    
 
1997/1102    
	/* split block if it's too big and this is not a message-oriented queue */ 
1995/0714    
	nb = b; 
	if(n > len){ 
		if((q->state&Qmsg) == 0){ 
1995/0917    
			iunlock(q); 
                 
1995/0714    
			n -= len; 
			b = allocb(n); 
			memmove(b->wp, nb->rp+len, n); 
			b->wp += n; 
1993/0526    
                 
1995/0917    
			ilock(q); 
1995/0714    
			b->next = q->bfirst; 
			if(q->bfirst == 0) 
				q->blast = b; 
			q->bfirst = b; 
1998/0918    
			q->len += BALLOC(b); 
1998/0922    
			q->dlen += n; 
1995/0714    
		} 
		nb->wp = nb->rp + len; 
2001/0128    
/* 
 *  put a block back to the front of the queue 
 *  called with q ilocked 
 */ 
static void 
qputback(Queue *q, Block *b) 
{ 
	if(q->state & (Qclosed|Qmsg)){ 
		freeb(b); 
		return; 
1993/0526    
	} 
2001/0128    
	b->next = q->bfirst; 
	if(q->bfirst == nil) 
		q->blast = b; 
	q->bfirst = b; 
	q->len += BALLOC(b); 
	q->dlen += BLEN(b); 
} 
1999/0115    
 
2001/0128    
/* 
 *  flow control, get producer going again 
 *  called with q ilocked 
 */ 
static void 
qwakeup_iunlock(Queue *q) 
{ 
	int dowakeup = 0; 
 
1999/0115    
	/* if writer flow controlled, restart */ 
	if((q->state & Qflow) && q->len < q->limit/2){ 
		q->state &= ~Qflow; 
		dowakeup = 1; 
	} else 
		dowakeup = 0; 
2001/0128    
	} 
1993/0527    
 
1995/0714    
	iunlock(q); 
 
2001/0127/sys/src/9/port/qio.c:857,8632001/0128/sys/src/9/port/qio.c:864,920
1997/0327    
			q->kick(q->arg); 
1993/0528    
		wakeup(&q->wr); 
1994/0324    
	} 
2001/0128    
} 
1993/0528    
 
2001/0128    
/* 
 *  get next block from a queue (up to a limit) 
 */ 
Block* 
qbread(Queue *q, int len) 
{ 
	Block *b, *nb; 
	int n; 
 
	qlock(&q->rlock); 
	if(waserror()){ 
		qunlock(&q->rlock); 
		nexterror(); 
	} 
 
	ilock(q); 
	switch(qwait(q)){ 
	case 0: 
		/* queue closed */ 
		iunlock(q); 
		qunlock(&q->rlock); 
		poperror(); 
		return nil; 
	case -1: 
		/* multiple reads on a closed queue */ 
		iunlock(q); 
		error(q->err); 
	} 
 
	/* if we get here, there's at least one block in the queue */ 
	b = qremove(q); 
	n = BLEN(b); 
 
	/* split block if it's too big and this is not a message queue */ 
	nb = b; 
	if(n > len){ 
		if((q->state&Qmsg) == 0){ 
			n -= len; 
			b = allocb(n); 
			memmove(b->wp, nb->rp+len, n); 
			b->wp += n; 
			qputback(q, b); 
		} 
		nb->wp = nb->rp + len; 
	} 
 
	/* restart producer */ 
	qwakeup_iunlock(q); 
 
1993/0527    
	poperror(); 
1993/0526    
	qunlock(&q->rlock); 
1995/0714    
	return nb; 
2001/0127/sys/src/9/port/qio.c:870,9042001/0128/sys/src/9/port/qio.c:927,1023
1995/0714    
long 
qread(Queue *q, void *vp, int len) 
{ 
	Block *b; 
2001/0127    
	int m; 
	uchar *p; 
	uchar *e; 
2001/0128    
	Block *b, *first, *next, **l; 
	int m, n; 
	uchar *p = vp; 
1995/0714    
 
2001/0127    
	p = vp; 
2001/0128    
	qlock(&q->rlock); 
	if(waserror()){ 
		qunlock(&q->rlock); 
		nexterror(); 
	} 
1995/0714    
 
2001/0127    
	if((q->state & Qcoalesce) == 0){ 
		b = qbread(q, len); 
		if(b == 0) 
			return 0; 
2001/0128    
	ilock(q); 
again: 
	switch(qwait(q)){ 
	case 0: 
		/* queue closed */ 
		iunlock(q); 
		qunlock(&q->rlock); 
		poperror(); 
		return 0; 
	case -1: 
		/* multiple reads on a closed queue */ 
		iunlock(q); 
		error(q->err); 
	} 
2001/0127    
 
2001/0128    
	/* if we get here, there's at least one block in the queue */ 
	if(q->state & Qcoalesce){ 
		/* when coalescing, 0 length blocks just go away */ 
		if(q->dlen <= 0){ 
			freeb(qremove(q)); 
			goto again; 
		} 
 
		/*  grab the first block plus as many 
		 *  following blocks as will completely 
		 *  fit in the read. 
		 */ 
		l = &first; 
		b = q->bfirst; 
2001/0127    
		m = BLEN(b); 
		memmove(p, b->rp, m); 
		freeb(b); 
		return m; 
2001/0128    
		n = 0; 
		for(;;) { 
			*l = qremove(q); 
			l = &b->next; 
			n += m; 
			b = q->bfirst; 
			if(b == nil) 
				break; 
			m = BLEN(b); 
			if(n+m > len) 
				break; 
		} 
	} else { 
		first = qremove(q); 
2001/0127    
	} 
 
	for(e = p + len; p < e; p += m){ 
		b = qbread(q, e-p); 
		if(b == 0) 
			return 0; 
                 
2001/0128    
	/* copy to user space outside of the ilock */ 
	iunlock(q); 
	n = 0; 
	for(b = first; b != nil; b = next){ 
2001/0127    
		m = BLEN(b); 
2001/0128    
		if(m > len-n){ 
			m = len - n; 
			n = len; 
			memmove(p, b->rp, m); 
			b->rp += m; 
			break; 
		} 
2001/0127    
		memmove(p, b->rp, m); 
2001/0128    
		p += m; 
		n += m; 
		next = b->next; 
		b->next = nil; 
2001/0127    
		freeb(b); 
	} 
2001/0128    
	ilock(q); 
2001/0127    
 
	return p-(uchar*)vp; 
2001/0128    
	/* take care of any left over partial block */ 
	if(b != nil){ 
		if(q->state & Qmsg) 
			freeb(b); 
		else 
			qputback(q, b); 
	} 
 
	/* restart producer */ 
	qwakeup_iunlock(q); 
 
	poperror(); 
	qunlock(&q->rlock); 
	return n; 
1995/0714    
} 
 
1993/0528    
static int 


source code copyright © 1990-2005 Lucent Technologies; see license
Plan 9 distribution
comments to russ cox (rsc@swtch.com)