plan 9 kernel history: overview | file list | diff list

1993/0528/port/qio.c (diff list | history)

1993/0527/sys/src/9/port/qio.c:150,1591993/0528/sys/src/9/port/qio.c:150,159 (short | long | prev | next)
1993/0527    
			cl->first = p->next; 
1993/0526    
			unlock(cl); 
			b = (Block *)p; 
1993/0528    
			memset(b, 0, sizeof(Block)); 
1993/0526    
			b->base = (uchar*)(b+1); 
			b->wp = b->rp = b->base; 
			b->lim = b->base + (1<<pow) - sizeof(Block); 
1993/0527    
			b->flag = 0; 
1993/0526    
			return b; 
		} 
	panic("iallocb %d\n", size); 
1993/0527/sys/src/9/port/qio.c:186,1911993/0528/sys/src/9/port/qio.c:186,192
1993/0526    
	if(b == 0) 
		exhausted("Blocks"); 
 
1993/0528    
	memset(b, 0, sizeof(Block)); 
1993/0526    
	b->base = (uchar*)(b+1); 
	b->rp = b->wp = b->base; 
	b->lim = b->base + size; 
1993/0527/sys/src/9/port/qio.c:202,2081993/0528/sys/src/9/port/qio.c:203,209
1993/0526    
qconsume(Queue *q, uchar *p, int len) 
{ 
	Block *b; 
	int n; 
1993/0528    
	int n, dowakeup; 
1993/0526    
 
1993/0527    
	/* sync with qwrite */ 
1993/0526    
	lock(q); 
1993/0527/sys/src/9/port/qio.c:213,2181993/0528/sys/src/9/port/qio.c:214,220
1993/0526    
		unlock(q); 
		return -1; 
	} 
1993/0528    
 
1993/0526    
	n = BLEN(b); 
	if(n < len) 
		len = n; 
1993/0527/sys/src/9/port/qio.c:223,2341993/0528/sys/src/9/port/qio.c:225,243
1993/0526    
		b->rp += len; 
	q->len -= len; 
 
	/* wakeup flow controlled writers (with a bit of histeresis) */ 
	if(q->len+len >= q->limit && q->len < q->limit/2) 
		wakeup(&q->r); 
1993/0528    
	/* if writer flow controlled, restart */ 
	if((q->state & Qflow) && q->len < q->limit/2){ 
		q->state &= ~Qflow; 
		dowakeup = 1; 
	} else 
		dowakeup = 0; 
1993/0526    
 
	unlock(q); 
 
1993/0528    
	if(dowakeup) 
		wakeup(&q->wr); 
 
	/* discard the block if we're done with it */ 
1993/0527    
	if((q->state & Qmsg) || len == n) 
1993/0526    
		ifree(b); 
 
1993/0527/sys/src/9/port/qio.c:235,2641993/0528/sys/src/9/port/qio.c:244,258
1993/0526    
	return len; 
} 
 
static int 
qproduce0(Queue *q, uchar *p, int len) 
1993/0528    
int 
qproduce(Queue *q, uchar *p, int len) 
1993/0526    
{ 
	Block *b; 
	int n; 
1993/0528    
	int dowakeup; 
1993/0526    
 
1993/0527    
	/* sync with qread */ 
1993/0526    
	lock(q); 
1993/0527    
 
1993/0526    
	b = q->rfirst; 
	if(b){ 
		/* hand to waiting receiver */ 
1993/0527    
		q->rfirst = b->next; 
		unlock(q); 
1993/0526    
		n = b->lim - b->wp; 
		if(n < len) 
			len = n; 
		memmove(b->wp, p, len); 
		b->wp += len; 
1993/0527    
		b->flag |= Bfilled; 
1993/0526    
		wakeup(&b->r); 
		return len; 
	} 
                 
	/* no waiting receivers, room in buffer? */ 
	if(q->len >= q->limit){ 
		unlock(q); 
1993/0527/sys/src/9/port/qio.c:279,2851993/0528/sys/src/9/port/qio.c:273,278
1993/0526    
		} 
1993/0527    
		memmove(b->wp, p, len); 
1993/0526    
		b->wp += len; 
1993/0527    
		b->flag |= Bfilled; 
1993/0526    
		if(q->bfirst) 
			q->blast->next = b; 
		else 
1993/0527/sys/src/9/port/qio.c:287,3121993/0528/sys/src/9/port/qio.c:280,298
1993/0526    
		q->blast = b; 
	} 
	q->len += len; 
1993/0528    
	if(q->state & Qstarve){ 
		q->state &= ~Qstarve; 
		dowakeup = 1; 
	} else 
		dowakeup = 0; 
1993/0526    
	unlock(q); 
1993/0527    
 
1993/0528    
	if(dowakeup) 
		wakeup(&q->rr); 
 
1993/0526    
	return len; 
} 
 
int 
qproduce(Queue *q, uchar *p, int len) 
{ 
	int n, sofar; 
                 
1993/0527    
	sofar = 0; 
	do { 
		n = qproduce0(q, p + sofar, len - sofar); 
1993/0526    
		if(n < 0) 
			break; 
1993/0527    
		sofar += n; 
	} while(sofar < len && (q->state & Qmsg) == 0); 
1993/0526    
	return sofar; 
} 
                 
/* 
 *  called by non-interrupt code 
 */ 
1993/0527/sys/src/9/port/qio.c:317,3231993/0528/sys/src/9/port/qio.c:303,311
1993/0526    
 
	q = malloc(sizeof(Queue)); 
	if(q == 0) 
		exhausted("Queues"); 
1993/0528    
		return 0; 
 
	memset(q, 0, sizeof(Queue)); 
1993/0526    
	q->limit = limit; 
	q->kick = kick; 
	q->arg = arg; 
1993/0527/sys/src/9/port/qio.c:326,5731993/0528/sys/src/9/port/qio.c:314,520
1993/0526    
	return q; 
} 
 
1993/0527    
ulong qrtoomany; 
ulong qrtoofew; 
                 
1993/0526    
static int 
bfilled(void *a) 
1993/0528    
notempty(void *a) 
1993/0526    
{ 
	Block *b = a; 
1993/0528    
	Queue *q = a; 
1993/0526    
 
1993/0527    
	return b->flag & Bfilled; 
1993/0528    
	return q->bfirst != 0; 
1993/0526    
} 
 
1993/0528    
/* 
 *  read a queue.  if no data is queued, post a Block 
 *  and wait on its Rendez. 
 */ 
1993/0526    
long 
qread(Queue *q, char *p, int len) 
{ 
1993/0527    
	Block *b, *bb, **l; 
1993/0526    
	int x, n; 
1993/0528    
	Block *b; 
	int x, n, dowakeup; 
1993/0526    
 
	qlock(&q->rlock); 
1993/0527    
	b = 0; 
	if(waserror()){ 
		qunlock(&q->rlock); 
		if(b) 
			free(b); 
		nexterror(); 
	} 
1993/0526    
 
1993/0527    
	/* 
	 *  If there are no buffered blocks, allocate a block 
	 *  for the qproducer/qwrite to fill.  This is 
	 *  optimistic and and we will 
	 *  sometimes be wrong: after locking we may either 
	 *  have to throw away or allocate one. 
	 * 
	 *  We hope to replace the allocb with a kmap later on. 
	 */ 
retry: 
	if(q->bfirst == 0) 
		b = allocb(len); 
1993/0528    
	/* wait for data */ 
	for(;;){ 
		/* sync with qwrite/qproduce */ 
		x = splhi(); 
		lock(q); 
1993/0526    
 
1993/0527    
	/* sync with qwrite/qproduce */ 
1993/0526    
	x = splhi(); 
	lock(q); 
1993/0527    
                 
1993/0526    
	bb = q->bfirst; 
	if(bb == 0){ 
1993/0527    
		if(b == 0){ 
			/* we guessed wrong, drop the locks and try again */ 
1993/0528    
		if(q->state & Qclosed){ 
1993/0527    
			unlock(q); 
			splx(x); 
			qrtoofew++; 
			goto retry; 
1993/0528    
			return 0; 
1993/0527    
		} 
 
		/* add ourselves to the list of readers */ 
1993/0526    
		if(q->rfirst) 
			q->rlast->next = b; 
		else 
			q->rfirst = b; 
		q->rlast = b; 
1993/0528    
		b = q->bfirst; 
		if(b) 
			break; 
		q->state |= Qstarve; 
1993/0526    
		unlock(q); 
		splx(x); 
		qunlock(&q->rlock); 
1993/0527    
		poperror(); 
                 
		if(waserror()){ 
			/* on error, unlink us from the chain */ 
			x = splhi(); 
			lock(q); 
			l = &q->rfirst; 
			for(bb = q->rfirst; bb; bb = bb->next){ 
				if(b == bb){ 
					*l = bb->next; 
					break; 
				} else 
					l = &bb->next; 
			} 
			unlock(q); 
			splx(x); 
			free(b); 
			nexterror(); 
		} 
                 
		/* wait for the producer */ 
1993/0526    
		sleep(&b->r, bfilled, b); 
		n = BLEN(b); 
		memmove(p, b->rp, n); 
1993/0527    
		poperror(); 
1993/0526    
		free(b); 
1993/0527    
                 
1993/0526    
		return n; 
1993/0528    
		sleep(&q->rr, notempty, q); 
1993/0526    
	} 
 
	/* copy from a buffered block */ 
	q->bfirst = bb->next; 
	n = BLEN(bb); 
	if(n > len) 
		n = len; 
1993/0528    
	/* remove a buffered block */ 
	q->bfirst = b->next; 
	n = BLEN(b); 
1993/0526    
	q->len -= n; 
1993/0528    
 
	/* if writer flow controlled, restart */ 
	if((q->state & Qflow) && q->len < q->limit/2){ 
		q->state &= ~Qflow; 
		dowakeup = 1; 
	} else 
		dowakeup = 0; 
1993/0526    
	unlock(q); 
	splx(x); 
1993/0527    
 
	/* do this outside of the lock(q)! */ 
1993/0526    
	memmove(p, bb->rp, n); 
	bb->rp += n; 
1993/0528    
	if(n > len) 
		n = len; 
	memmove(p, b->rp, n); 
	b->rp += n; 
1993/0526    
 
1993/0527    
	/* free it or put it back on the queue */ 
	if(bb->rp >= bb->wp || (q->state&Qmsg)) 
1993/0526    
		free(bb); 
1993/0528    
	/* free it or put it what's left on the queue */ 
	if(b->rp >= b->wp || (q->state&Qmsg)) 
		free(b); 
1993/0526    
	else { 
		x = splhi(); 
		lock(q); 
		bb->next = q->bfirst; 
		q->bfirst = bb; 
1993/0528    
		b->next = q->bfirst; 
		q->bfirst = b; 
		q->len += BLEN(b); 
1993/0526    
		unlock(q); 
		splx(x); 
	} 
1993/0527    
 
1993/0528    
	/* wakeup flow controlled writers (with a bit of histeresis) */ 
	if(dowakeup) 
		wakeup(&q->wr); 
 
1993/0527    
	poperror(); 
1993/0526    
	qunlock(&q->rlock); 
1993/0527    
	if(b){ 
		qrtoomany++; 
		free(b); 
	} 
1993/0526    
	return n; 
} 
 
1993/0527    
ulong qwtoomany; 
ulong qwtoofew; 
1993/0526    
                 
static long 
1993/0527    
qwrite0(Queue *q, char *p, int len, Block *b) 
1993/0528    
static int 
qnotfull(void *a) 
1993/0526    
{ 
1993/0527    
	Block *bb; 
	int x, n, sofar; 
1993/0528    
	Queue *q = a; 
1993/0526    
 
1993/0527    
	/* sync with qconsume/qread */ 
1993/0526    
	x = splhi(); 
	lock(q); 
1993/0528    
	return q->len < q->limit; 
} 
1993/0527    
 
	sofar = 0; 
	while(bb = q->rfirst){ 
1993/0526    
		/* hand to waiting receiver */ 
		q->rfirst = bb->next; 
		unlock(q); 
		splx(x); 
1993/0528    
/* 
 *  write to a queue.  if no reader blocks are posted 
 *  queue the data. 
 */ 
long 
qwrite(Queue *q, char *p, int len) 
{ 
	int x, dowakeup; 
	Block *b; 
1993/0526    
 
1993/0527    
		n = bb->lim - bb->wp; 
		if(n > len-sofar) 
			n = len - sofar; 
		memmove(bb->wp, p+sofar, n); 
		bb->wp += n; 
		bb->flag |= Bfilled; 
1993/0526    
		wakeup(&bb->r); 
1993/0528    
	b = allocb(len); 
	memmove(b->wp, p, len); 
	b->wp += len; 
1993/0526    
 
1993/0527    
		sofar += n; 
		if(sofar == len){ 
			if(b){ 
				free(b);	/* we were wrong to allocate */ 
				qwtoomany++; 
			} 
			return len; 
		} 
1993/0528    
	/* flow control */ 
	while(!qnotfull(q)){ 
		qlock(&q->wlock); 
		q->state |= Qflow; 
		sleep(&q->wr, qnotfull, q); 
		qunlock(&q->wlock); 
1993/0526    
	} 
 
1993/0527    
	/* buffer what ever is left */ 
	if(b == 0){ 
		/* we should have alloc'd, return to qwrite and have it do it */ 
1993/0528    
	x = splhi(); 
	lock(q); 
 
	if(q->state & Qclosed){ 
1993/0527    
		unlock(q); 
		splx(x); 
		qwtoofew++; 
		return sofar; 
1993/0528    
		error(Ehungup); 
1993/0527    
	} 
	b->rp += sofar; 
1993/0526    
 
	x = splhi(); 
	lock(q); 
	if(q->bfirst) 
		q->blast->next = b; 
	else 
		q->bfirst = b; 
	q->blast = b; 
1993/0528    
	b->next = q->bfirst; 
	q->bfirst = b; 
1993/0526    
	q->len += len; 
	if((q->state & Qstarve) && q->kick){ 
1993/0528    
 
	if(q->state & Qstarve){ 
1993/0526    
		q->state &= ~Qstarve; 
		(*q->kick)(q->arg); 
	} 
1993/0528    
		dowakeup = 1; 
	} else 
		dowakeup = 0; 
 
1993/0526    
	unlock(q); 
	splx(x); 
 
1993/0528    
	if(dowakeup) 
		wakeup(&q->rr); 
 
1993/0526    
	return len; 
} 
 
1993/0527    
static int 
qnotfull(void *a) 
1993/0528    
/* 
 *  Mark a queue as closed.  No further IO is permitted. 
 *  All blocks are released. 
 */ 
void 
qclose(Queue *q) 
1993/0527    
{ 
	Queue *q = a; 
1993/0528    
	int x; 
	Block *b, *bfirst; 
1993/0527    
 
	return q->len < q->limit; 
} 
1993/0528    
	/* mark it */ 
	x = splhi(); 
	lock(q); 
	q->state |= Qclosed; 
	bfirst = q->bfirst; 
	q->bfirst = 0; 
	unlock(q); 
	splx(x); 
1993/0527    
 
1993/0526    
long 
qwrite(Queue *q, char *p, int len) 
{ 
1993/0527    
	int n, i; 
	Block *b; 
1993/0526    
                 
1993/0527    
	/* 
	 *  If there are no readers, grab a buffer and copy 
	 *  into it before locking anything down.  This 
	 *  provides the highest concurrency but we will 
	 *  sometimes be wrong: after locking we may either 
	 *  have to throw away or allocate one. 
	 */ 
	if(q->rfirst == 0){ 
		b = allocb(len); 
		memmove(b->wp, p, len); 
		b->wp += len; 
	} else 
		b = 0; 
                 
	/* ensure atomic writes */ 
1993/0526    
	qlock(&q->wlock); 
	if(waserror()){ 
		qunlock(&q->wlock); 
		nexterror(); 
1993/0528    
	/* free queued blocks */ 
	while(b = bfirst){ 
		bfirst = b->next; 
		free(b); 
1993/0526    
	} 
 
1993/0527    
	/* flow control */ 
	sleep(&q->r, qnotfull, q); 
1993/0528    
	/* wake up readers/writers */ 
	wakeup(&q->rr); 
	wakeup(&q->wr); 
} 
1993/0527    
 
	n = qwrite0(q, p, len, b); 
	if(n != len){ 
		/* no readers and we need a buffer */ 
		i = len - n; 
		b = allocb(i); 
		memmove(b->wp, p + n, i); 
		b->wp += n; 
		n += qwrite0(q, p + n, i, b); 
1993/0526    
	} 
1993/0528    
/* 
 *  Mark a queue as closed.  Wakeup any readers.  Don't remove queued 
 *  blocks. 
 */ 
void 
qhangup(Queue *q) 
{ 
	int x; 
1993/0526    
 
	qunlock(&q->wlock); 
1993/0527    
	poperror(); 
1993/0528    
	/* mark it */ 
	x = splhi(); 
	lock(q); 
	q->state |= Qclosed; 
	unlock(q); 
	splx(x); 
1993/0527    
 
	return n; 
1993/0528    
	/* wake up readers/writers */ 
	wakeup(&q->rr); 
	wakeup(&q->wr); 
} 
 
/* 
 *  mark a queue as no longer hung up 
 */ 
void 
qreopen(Queue *q) 
{ 
	q->state &= ~Qclosed; 
1993/0526    
} 


source code copyright © 1990-2005 Lucent Technologies; see license
Plan 9 distribution
comments to russ cox (rsc@swtch.com)