plan 9 kernel history: overview | file list | diff list

1994/0208/port/qio.c (diff list | history)

1993/1227/sys/src/9/port/qio.c:79,841994/0208/sys/src/9/port/qio.c:79,87 (short | long | prev | next)
1993/0530    
	Rendez	rr;		/* process waiting to read */ 
	QLock	wlock;		/* mutex for writing processes */ 
	Rendez	wr;		/* process waiting to write */ 
1994/0208    
 
	uchar	*syncbuf;	/* synchronous IO buffer */ 
	int	synclen;	/* syncbuf length */ 
1993/0530    
}; 
 
enum 
1993/1227/sys/src/9/port/qio.c:188,2041994/0208/sys/src/9/port/qio.c:191,220
1993/0526    
} 
 
void 
iallocinit(void) 
1994/0208    
qinit(void) 
1993/0526    
{ 
	int pow; 
	Chunkl *cl; 
1994/0208    
	Chunk *p; 
1993/0526    
 
1994/0208    
	/* start with a bunch of initial blocks */ 
1993/0526    
	for(pow = Minpow; pow <= Maxpow; pow++){ 
		cl = &arena.alloc[pow]; 
1993/1102    
		cl->goal = Maxpow-pow + 16; 
1994/0208    
		cl->goal = Maxpow-pow + 32; 
		cl->first = 0; 
		for(; cl->have < cl->goal; cl->have++){ 
			p = malloc(1<<pow); 
			p->next = cl->first; 
			cl->first = p; 
		} 
1993/0526    
	} 
 
	/* start garbage collector */ 
1994/0208    
} 
 
void 
iallocinit(void) 
{ 
	/* start garbage collector/creator */ 
1993/0725    
	kproc("ialloc", iallockproc, 0); 
1993/0526    
} 
 
1993/1227/sys/src/9/port/qio.c:216,2301994/0208/sys/src/9/port/qio.c:232,250
1993/0819    
	print("\n"); 
} 
 
1994/0208    
/* 
 *  interrupt time allocation (round data base address to 64 bit boundary) 
 */ 
1993/0526    
Block* 
iallocb(int size) 
{ 
	int pow; 
1994/0208    
	ulong addr; 
1993/0526    
	Chunkl *cl; 
	Chunk *p; 
	Block *b; 
 
	size += sizeof(Block); 
1994/0208    
	size += sizeof(Block) + 7; 
1993/0804    
	for(pow = Minpow; pow <= Maxpow; pow++){ 
1993/0526    
		if(size <= (1<<pow)){ 
			cl = &arena.alloc[pow]; 
1993/1227/sys/src/9/port/qio.c:239,2501994/0208/sys/src/9/port/qio.c:259,273
1993/0527    
			cl->have--; 
			cl->first = p->next; 
1993/0526    
			unlock(cl); 
1994/0208    
 
1993/0526    
			b = (Block *)p; 
1993/0528    
			memset(b, 0, sizeof(Block)); 
1993/0526    
			b->base = (uchar*)(b+1); 
1994/0208    
			addr = (ulong)b; 
			addr = (addr + sizeof(Block) + 7) & ~7; 
			b->base = (uchar*)addr; 
1993/0804    
			b->wp = b->base; 
			b->rp = b->base; 
1993/0526    
			b->lim = b->base + (1<<pow) - sizeof(Block); 
1994/0208    
			b->lim = ((uchar*)b) + (1<<pow); 
1993/0526    
			return b; 
		} 
1993/0804    
	} 
1993/1227/sys/src/9/port/qio.c:268,2881994/0208/sys/src/9/port/qio.c:291,315
1993/0526    
} 
 
/* 
 *  allocate queues and blocks 
1994/0208    
 *  allocate queues and blocks (round data base address to 64 bit boundary) 
1993/0526    
 */ 
Block* 
allocb(int size) 
{ 
	Block *b; 
1994/0208    
	ulong addr; 
1993/0526    
 
	b = malloc(sizeof(Block) + size); 
1994/0208    
	size += sizeof(Block) + 7; 
	b = malloc(size); 
1993/0526    
	if(b == 0) 
		exhausted("Blocks"); 
 
	b->base = (uchar*)(b+1); 
1994/0208    
	addr = (ulong)b; 
	addr = (addr + sizeof(Block) + 7) & ~7; 
	b->base = (uchar*)addr; 
1993/0804    
	b->rp = b->base; 
	b->wp = b->base; 
1993/0526    
	b->lim = b->base + size; 
1994/0208    
	b->lim = ((uchar*)b) + size; 
1993/0527    
	b->flag = 0; 
1993/0526    
 
	return b; 
1993/1227/sys/src/9/port/qio.c:343,3541994/0208/sys/src/9/port/qio.c:370,401
1993/0601    
qproduce(Queue *q, void *vp, int len) 
1993/0526    
{ 
	Block *b; 
1993/0528    
	int dowakeup; 
1994/0208    
	int i, dowakeup; 
1993/0601    
	uchar *p = vp; 
1993/0526    
 
1993/0527    
	/* sync with qread */ 
1994/0208    
	dowakeup = 0; 
1993/0526    
	lock(q); 
1993/0527    
 
1994/0208    
	if(q->syncbuf){ 
		/* synchronous communications, just copy into buffer */ 
		if(len < q->synclen) 
			q->synclen = len; 
		i = q->synclen; 
		memmove(q->syncbuf, p, i); 
		q->syncbuf = 0;		/* tell reader buffer is full */ 
		len -= i; 
		if(len <= 0 || (q->state & Qmsg)){ 
			unlock(q); 
			wakeup(&q->rr); 
			return i; 
		} 
 
		/* queue anything that's left */ 
		dowakeup = 1; 
		p += i; 
	} 
 
1993/0526    
	/* no waiting receivers, room in buffer? */ 
	if(q->len >= q->limit){ 
		unlock(q); 
1993/1227/sys/src/9/port/qio.c:379,3861994/0208/sys/src/9/port/qio.c:426,432
1993/0528    
	if(q->state & Qstarve){ 
		q->state &= ~Qstarve; 
		dowakeup = 1; 
	} else 
		dowakeup = 0; 
1994/0208    
	} 
1993/0526    
	unlock(q); 
1993/0527    
 
1993/0601    
	if(dowakeup){ 
1993/1227/sys/src/9/port/qio.c:416,4211994/0208/sys/src/9/port/qio.c:462,475
1993/0526    
} 
 
static int 
1994/0208    
filled(void *a) 
{ 
	Queue *q = a; 
 
	return q->syncbuf == 0; 
} 
 
static int 
1993/0528    
notempty(void *a) 
1993/0526    
{ 
1993/0528    
	Queue *q = a; 
1993/1227/sys/src/9/port/qio.c:436,4411994/0208/sys/src/9/port/qio.c:490,505
1993/0526    
 
	qlock(&q->rlock); 
1993/0527    
	if(waserror()){ 
1994/0208    
		/* can't let go if the buffer is in use */ 
		if(q->syncbuf){ 
			qlock(&q->wlock); 
			x = splhi(); 
			lock(q); 
			q->syncbuf = 0; 
			unlock(q); 
			splx(x); 
			qunlock(&q->wlock); 
		} 
1993/0527    
		qunlock(&q->rlock); 
		nexterror(); 
	} 
1993/1227/sys/src/9/port/qio.c:460,4691994/0208/sys/src/9/port/qio.c:524,546
1993/0528    
			return 0; 
1993/0527    
		} 
 
1993/0528    
		q->state |= Qstarve; 
1993/0526    
		unlock(q); 
		splx(x); 
1993/0528    
		sleep(&q->rr, notempty, q); 
1994/0208    
		if(globalmem(vp)){ 
			/* just let the writer fill the buffer directly */ 
			q->synclen = len; 
			q->syncbuf = vp; 
			unlock(q); 
			splx(x); 
			sleep(&q->rr, filled, q); 
			len = q->synclen; 
			poperror(); 
			qunlock(&q->rlock); 
			return len; 
		} else { 
			q->state |= Qstarve; 
			unlock(q); 
			splx(x); 
			sleep(&q->rr, notempty, q); 
		} 
1993/0526    
	} 
 
1993/0528    
	/* remove a buffered block */ 
1993/1227/sys/src/9/port/qio.c:490,4971994/0208/sys/src/9/port/qio.c:567,573
1993/0811    
	if(b->rp >= b->wp || (q->state&Qmsg)) { 
		poison(b); 
1993/0528    
		free(b); 
1993/0811    
	} 
1993/0526    
	else { 
1994/0208    
	} else { 
1993/0526    
		x = splhi(); 
		lock(q); 
1993/0528    
		b->next = q->bfirst; 
1993/1227/sys/src/9/port/qio.c:521,5841994/0208/sys/src/9/port/qio.c:597,707
1993/0528    
/* 
 *  write to a queue.  if no reader blocks are posted 
 *  queue the data. 
1994/0208    
 * 
 *  all copies should be outside of spl since they can fault. 
1993/0528    
 */ 
long 
1993/0601    
qwrite(Queue *q, void *vp, int len, int nowait) 
1993/0528    
{ 
	int x, dowakeup; 
1994/0208    
	int n, sofar, x, dowakeup; 
1993/0528    
	Block *b; 
1993/0601    
	uchar *p = vp; 
1993/0526    
 
1993/0528    
	b = allocb(len); 
	memmove(b->wp, p, len); 
	b->wp += len; 
1994/0208    
	dowakeup = 0; 
1993/0526    
 
1993/0528    
	/* flow control */ 
	while(!qnotfull(q)){ 
1993/0601    
		if(nowait) 
			return len; 
1993/0528    
		qlock(&q->wlock); 
1993/1227    
		if(waserror()) { 
1994/0208    
	if(waserror()){ 
		qunlock(&q->wlock); 
		nexterror(); 
	}; 
	qlock(&q->wlock); 
 
	sofar = 0; 
	if(q->syncbuf){ 
		if(len < q->synclen) 
			sofar = len; 
		else 
			sofar = q->synclen; 
 
		memmove(q->syncbuf, p, sofar); 
		q->synclen = sofar; 
		q->syncbuf = 0; 
		wakeup(&q->rr); 
 
		if(len == sofar || (q->state & Qmsg)){ 
1993/1227    
			qunlock(&q->wlock); 
			nexterror(); 
1994/0208    
			poperror(); 
			return len; 
1993/1227    
		} 
1993/0528    
		q->state |= Qflow; 
		sleep(&q->wr, qnotfull, q); 
		qunlock(&q->wlock); 
1993/1227    
		poperror(); 
1993/0526    
	} 
 
1993/0528    
	x = splhi(); 
	lock(q); 
1994/0208    
	do { 
		n = len-sofar; 
		if(n > 128*1024) 
			n = 128*1024; 
1993/0528    
 
	if(q->state & Qclosed){ 
1994/0208    
		b = allocb(n); 
		memmove(b->wp, p+sofar, n); 
		b->wp += n; 
	 
		/* flow control */ 
		while(!qnotfull(q)){ 
			if(nowait){ 
				free(b); 
				qunlock(&q->wlock); 
				poperror(); 
				return len; 
			} 
			q->state |= Qflow; 
			sleep(&q->wr, qnotfull, q); 
		} 
	 
		x = splhi(); 
		lock(q); 
	 
		if(q->state & Qclosed){ 
			unlock(q); 
			splx(x); 
			error(Ehungup); 
		} 
	 
		if(q->syncbuf){ 
			/* we guessed wrong and did an extra copy */ 
			if(n > q->synclen) 
				n = q->synclen; 
			memmove(q->syncbuf, b->rp, n); 
			q->synclen = n; 
			q->syncbuf = 0; 
			dowakeup = 1; 
			free(b); 
		} else { 
			/* we guessed right, queue it */ 
			if(q->bfirst) 
				q->blast->next = b; 
			else 
				q->bfirst = b; 
			q->blast = b; 
			q->len += n; 
	 
			if(q->state & Qstarve){ 
				q->state &= ~Qstarve; 
				dowakeup = 1; 
			} 
		} 
 
1993/0527    
		unlock(q); 
		splx(x); 
1993/0528    
		error(Ehungup); 
1993/0527    
	} 
1993/0526    
 
1993/0601    
	if(q->bfirst) 
		q->blast->next = b; 
	else 
		q->bfirst = b; 
	q->blast = b; 
1993/0526    
	q->len += len; 
1994/0208    
		if(dowakeup){ 
			if(q->kick) 
				(*q->kick)(q->arg); 
			wakeup(&q->rr); 
		} 
1993/0528    
 
	if(q->state & Qstarve){ 
1993/0526    
		q->state &= ~Qstarve; 
1993/0528    
		dowakeup = 1; 
	} else 
		dowakeup = 0; 
1994/0208    
		sofar += n; 
	} while(sofar < len && (q->state & Qmsg) == 0); 
1993/0528    
 
1993/0526    
	unlock(q); 
	splx(x); 
                 
1993/0601    
	if(dowakeup){ 
		if(q->kick) 
			(*q->kick)(q->arg); 
1993/0528    
		wakeup(&q->rr); 
1993/0601    
	} 
1993/0528    
                 
1994/0208    
	qunlock(&q->wlock); 
	poperror(); 
1993/0526    
	return len; 
} 
 


source code copyright © 1990-2005 Lucent Technologies; see license
Plan 9 distribution
comments to russ cox (rsc@swtch.com)