plan 9 kernel history: overview | file list | diff list

1993/1227/port/qio.c (diff list | history)

port/qio.c on 1993/0526
1993/0526    
#include	"u.h" 
#include	"../port/lib.h" 
#include	"mem.h" 
#include	"dat.h" 
#include	"fns.h" 
#include	"../port/error.h" 
 
1993/0530    
/* 
 *  interrupt level memory allocation 
 */ 
1993/0526    
typedef struct Chunk	Chunk; 
typedef	struct Chunkl	Chunkl; 
typedef	struct Arena	Arena; 
 
enum 
{ 
	Minpow= 7, 
	Maxpow=	12, 
}; 
 
struct Chunk 
{ 
	Chunk	*next; 
}; 
 
struct Chunkl 
{ 
	Lock; 
	Chunk	*first; 
	int	have; 
	int	goal; 
	int	hist; 
1993/0819    
	int	wanted; 
1993/0526    
}; 
 
struct Arena 
{ 
	Chunkl	alloc[Maxpow+1]; 
	Chunkl	freed; 
	Rendez r; 
}; 
 
static Arena arena; 
 
/* 
1993/0530    
 *  IO queues 
 */ 
typedef struct Block	Block; 
typedef struct Queue	Queue; 
 
struct Block 
{ 
	Block	*next; 
 
	uchar	*rp;			/* first unconsumed byte */ 
	uchar	*wp;			/* first empty byte */ 
	uchar	*lim;			/* 1 past the end of the buffer */ 
	uchar	*base;			/* start of the buffer */ 
	uchar	flag; 
}; 
#define BLEN(b)		((b)->wp - (b)->rp) 
 
struct Queue 
{ 
	Lock; 
 
	Block	*bfirst;	/* buffer */ 
	Block	*blast; 
 
	int	len;		/* bytes in queue */ 
	int	limit;		/* max bytes in queue */ 
	int	state; 
1993/0908    
	int	eof;		/* number of eofs read by user */ 
1993/0530    
 
	void	(*kick)(void*);	/* restart output */ 
	void	*arg;		/* argument to kick */ 
 
	QLock	rlock;		/* mutex for reading processes */ 
	Rendez	rr;		/* process waiting to read */ 
	QLock	wlock;		/* mutex for writing processes */ 
	Rendez	wr;		/* process waiting to write */ 
}; 
 
enum 
{ 
	/* Block.flag */ 
	Bfilled=1,		/* block filled */ 
 
	/* Queue.state */	 
	Qstarve=	(1<<0),		/* consumer starved */ 
	Qmsg=		(1<<1),		/* message stream */ 
	Qclosed=	(1<<2), 
	Qflow=		(1<<3), 
}; 
 
1993/0811    
void 
poison(Block *b) 
{ 
	b->next = (void*)0xdeadbabe; 
	b->rp = (void*)0xdeadbabe; 
	b->wp = (void*)0xdeadbabe; 
	b->lim = (void*)0xdeadbabe; 
	b->base = (void*)0xdeadbabe; 
} 
 
1993/0530    
/* 
1993/0526    
 *  Manage interrupt level memory allocation. 
 */ 
static void 
iallockproc(void *arg) 
{ 
	Chunk *p, *first, **l; 
	Chunkl *cl; 
	int pow, x, i; 
 
	USED(arg); 
	for(;;){ 
		tsleep(&arena.r, return0, 0, 500); 
 
		/* really free what was freed at interrupt level */ 
		cl = &arena.freed; 
		if(cl->first){ 
			x = splhi(); 
			lock(cl); 
			first = cl->first; 
			cl->first = 0; 
			unlock(cl); 
			splx(x); 
	 
1993/0804    
			while(first != 0) { 
1993/0526    
				p = first->next; 
				free(first); 
1993/0804    
				first = p; 
1993/0526    
			} 
		} 
 
		/* make sure we have blocks available for interrupt level */ 
		for(pow = Minpow; pow <= Maxpow; pow++){ 
			cl = &arena.alloc[pow]; 
 
			/* 
			 *  if we've been ahead of the game for a while 
			 *  start giving blocks back to the general pool 
			 */ 
			if(cl->have >= cl->goal){ 
1993/0819    
				cl->hist = ((cl->hist<<1) | 1) & 0xffff; 
				if(cl->hist == 0xffff && cl->goal > 32) 
1993/0526    
					cl->goal--; 
				continue; 
			} else 
				cl->hist <<= 1; 
 
			/* 
1993/0819    
			 *  increase goal if we've been drained. 
1993/0526    
			 */ 
1993/0819    
			if(cl->have == 0){ 
1993/1102    
				i = cl->goal>>1; 
1993/0819    
				if(cl->wanted > i) 
1993/1102    
					cl->goal += 2*cl->wanted; 
1993/0819    
				else 
					cl->goal += i; 
				cl->wanted = 0; 
			} 
1993/0526    
 
			first = 0; 
			l = &first; 
1993/0804    
			i = cl->goal - cl->have; 
			for(x = i; x > 0; x--){ 
1993/0526    
				p = malloc(1<<pow); 
				if(p == 0) 
					break; 
1993/0804    
 
1993/0526    
				*l = p; 
				l = &p->next; 
			} 
1993/0819    
			i -= x; 
1993/0526    
			if(first){ 
				x = splhi(); 
				lock(cl); 
				*l = cl->first; 
				cl->first = first; 
				cl->have += i; 
				unlock(cl); 
				splx(x); 
			} 
		} 
	} 
} 
 
void 
iallocinit(void) 
{ 
	int pow; 
	Chunkl *cl; 
 
	for(pow = Minpow; pow <= Maxpow; pow++){ 
		cl = &arena.alloc[pow]; 
1993/1102    
		cl->goal = Maxpow-pow + 16; 
1993/0526    
	} 
 
	/* start garbage collector */ 
1993/0725    
	kproc("ialloc", iallockproc, 0); 
1993/0526    
} 
 
1993/0819    
void 
ixsummary(void) 
{ 
	int pow; 
	Chunkl *cl; 
 
	print("size	have/goal\n"); 
	for(pow = Minpow; pow <= Maxpow; pow++){ 
		cl = &arena.alloc[pow]; 
		print("%d	%d/%d\n", 1<<pow, cl->have, cl->goal); 
	} 
	print("\n"); 
} 
 
1993/0526    
Block* 
iallocb(int size) 
{ 
	int pow; 
	Chunkl *cl; 
	Chunk *p; 
	Block *b; 
 
	size += sizeof(Block); 
1993/0804    
	for(pow = Minpow; pow <= Maxpow; pow++){ 
1993/0526    
		if(size <= (1<<pow)){ 
			cl = &arena.alloc[pow]; 
			lock(cl); 
			p = cl->first; 
1993/0527    
			if(p == 0){ 
1993/0819    
				cl->wanted++; 
1993/0527    
				unlock(cl); 
1993/0819    
				wakeup(&arena.r); 
1993/0527    
				return 0; 
1993/0526    
			} 
1993/0527    
			cl->have--; 
			cl->first = p->next; 
1993/0526    
			unlock(cl); 
			b = (Block *)p; 
1993/0528    
			memset(b, 0, sizeof(Block)); 
1993/0526    
			b->base = (uchar*)(b+1); 
1993/0804    
			b->wp = b->base; 
			b->rp = b->base; 
1993/0526    
			b->lim = b->base + (1<<pow) - sizeof(Block); 
			return b; 
		} 
1993/0804    
	} 
 
1993/0526    
	panic("iallocb %d\n", size); 
	return 0;			/* not reached */ 
} 
 
void 
ifree(void *a) 
{ 
	Chunk *p; 
	Chunkl *cl; 
 
	cl = &arena.freed; 
	p = a; 
	lock(cl); 
	p->next = cl->first; 
	cl->first = p; 
	unlock(cl); 
} 
 
/* 
 *  allocate queues and blocks 
 */ 
Block* 
allocb(int size) 
{ 
	Block *b; 
 
	b = malloc(sizeof(Block) + size); 
	if(b == 0) 
		exhausted("Blocks"); 
 
	b->base = (uchar*)(b+1); 
1993/0804    
	b->rp = b->base; 
	b->wp = b->base; 
1993/0526    
	b->lim = b->base + size; 
1993/0527    
	b->flag = 0; 
1993/0526    
 
	return b; 
} 
 
/* 
 *  Interrupt level copy out of a queue, return # bytes copied.  If drop is 
 *  set, any bytes left in a block afer a consume are discarded. 
 */ 
int 
1993/0601    
qconsume(Queue *q, void *vp, int len) 
1993/0526    
{ 
	Block *b; 
1993/0528    
	int n, dowakeup; 
1993/0601    
	uchar *p = vp; 
1993/0526    
 
1993/0527    
	/* sync with qwrite */ 
1993/0526    
	lock(q); 
1993/0527    
 
1993/0526    
	b = q->bfirst; 
	if(b == 0){ 
		q->state |= Qstarve; 
		unlock(q); 
		return -1; 
	} 
1993/0528    
 
1993/0526    
	n = BLEN(b); 
	if(n < len) 
		len = n; 
	memmove(p, b->rp, len); 
1993/0527    
	if((q->state & Qmsg) || len == n) 
1993/0526    
		q->bfirst = b->next; 
	else 
		b->rp += len; 
	q->len -= len; 
 
1993/0528    
	/* if writer flow controlled, restart */ 
	if((q->state & Qflow) && q->len < q->limit/2){ 
		q->state &= ~Qflow; 
		dowakeup = 1; 
	} else 
		dowakeup = 0; 
1993/0526    
 
	unlock(q); 
 
1993/0528    
	if(dowakeup) 
		wakeup(&q->wr); 
 
	/* discard the block if we're done with it */ 
1993/0811    
	if((q->state & Qmsg) || len == n) { 
		poison(b); 
1993/0526    
		ifree(b); 
1993/0811    
	} 
1993/0526    
	return len; 
} 
 
1993/0528    
int 
1993/0601    
qproduce(Queue *q, void *vp, int len) 
1993/0526    
{ 
	Block *b; 
1993/0528    
	int dowakeup; 
1993/0601    
	uchar *p = vp; 
1993/0526    
 
1993/0527    
	/* sync with qread */ 
1993/0526    
	lock(q); 
1993/0527    
 
1993/0526    
	/* no waiting receivers, room in buffer? */ 
	if(q->len >= q->limit){ 
		unlock(q); 
		return -1; 
	} 
 
	/* save in buffer */ 
	b = q->bfirst; 
1993/0527    
	if((q->state & Qmsg) == 0 && b && b->lim - b->wp <= len){ 
1993/0526    
		memmove(b->wp, p, len); 
		b->wp += len; 
1993/0527    
		b->flag |= Bfilled; 
1993/0526    
	} else { 
		b = iallocb(len); 
		if(b == 0){ 
			unlock(q); 
1993/1103    
			return -2; 
1993/0526    
		} 
1993/0527    
		memmove(b->wp, p, len); 
1993/0526    
		b->wp += len; 
		if(q->bfirst) 
			q->blast->next = b; 
		else 
			q->bfirst = b; 
		q->blast = b; 
	} 
	q->len += len; 
1993/0528    
	if(q->state & Qstarve){ 
		q->state &= ~Qstarve; 
		dowakeup = 1; 
	} else 
		dowakeup = 0; 
1993/0526    
	unlock(q); 
1993/0527    
 
1993/0601    
	if(dowakeup){ 
		if(q->kick) 
			(*q->kick)(q->arg); 
1993/0528    
		wakeup(&q->rr); 
1993/0601    
	} 
1993/0528    
 
1993/0526    
	return len; 
} 
 
/* 
 *  called by non-interrupt code 
 */ 
Queue* 
1993/0530    
qopen(int limit, int msg, void (*kick)(void*), void *arg) 
1993/0526    
{ 
	Queue *q; 
 
	q = malloc(sizeof(Queue)); 
	if(q == 0) 
1993/0528    
		return 0; 
 
	memset(q, 0, sizeof(Queue)); 
1993/0526    
	q->limit = limit; 
	q->kick = kick; 
	q->arg = arg; 
1993/0530    
	q->state = msg ? Qmsg : 0; 
1993/0601    
	q->state |= Qstarve; 
1993/0908    
	q->eof = 0; 
1993/0526    
 
	return q; 
} 
 
static int 
1993/0528    
notempty(void *a) 
1993/0526    
{ 
1993/0528    
	Queue *q = a; 
1993/0526    
 
1993/0528    
	return q->bfirst != 0; 
1993/0526    
} 
 
1993/0528    
/* 
 *  read a queue.  if no data is queued, post a Block 
 *  and wait on its Rendez. 
 */ 
1993/0526    
long 
1993/0601    
qread(Queue *q, void *vp, int len) 
1993/0526    
{ 
1993/0528    
	Block *b; 
	int x, n, dowakeup; 
1993/0601    
	uchar *p = vp; 
1993/0526    
 
	qlock(&q->rlock); 
1993/0527    
	if(waserror()){ 
		qunlock(&q->rlock); 
		nexterror(); 
	} 
1993/0526    
 
1993/0528    
	/* wait for data */ 
	for(;;){ 
		/* sync with qwrite/qproduce */ 
		x = splhi(); 
		lock(q); 
1993/0526    
 
1993/0728    
		b = q->bfirst; 
		if(b) 
			break; 
 
1993/0528    
		if(q->state & Qclosed){ 
1993/0527    
			unlock(q); 
			splx(x); 
1993/0908    
			poperror(); 
			qunlock(&q->rlock); 
			if(++q->eof > 3) 
				error(Ehungup); 
1993/0528    
			return 0; 
1993/0527    
		} 
 
1993/0528    
		q->state |= Qstarve; 
1993/0526    
		unlock(q); 
		splx(x); 
1993/0528    
		sleep(&q->rr, notempty, q); 
1993/0526    
	} 
 
1993/0528    
	/* remove a buffered block */ 
	q->bfirst = b->next; 
	n = BLEN(b); 
1993/0526    
	q->len -= n; 
1993/0528    
 
	/* if writer flow controlled, restart */ 
	if((q->state & Qflow) && q->len < q->limit/2){ 
		q->state &= ~Qflow; 
		dowakeup = 1; 
	} else 
		dowakeup = 0; 
1993/0526    
	unlock(q); 
	splx(x); 
1993/0527    
 
	/* do this outside of the lock(q)! */ 
1993/0528    
	if(n > len) 
		n = len; 
	memmove(p, b->rp, n); 
	b->rp += n; 
1993/0526    
 
1993/0725    
	/* free it or put what's left on the queue */ 
1993/0811    
	if(b->rp >= b->wp || (q->state&Qmsg)) { 
		poison(b); 
1993/0528    
		free(b); 
1993/0811    
	} 
1993/0526    
	else { 
		x = splhi(); 
		lock(q); 
1993/0528    
		b->next = q->bfirst; 
		q->bfirst = b; 
		q->len += BLEN(b); 
1993/0526    
		unlock(q); 
		splx(x); 
	} 
1993/0527    
 
1993/0528    
	/* wakeup flow controlled writers (with a bit of histeresis) */ 
	if(dowakeup) 
		wakeup(&q->wr); 
 
1993/0527    
	poperror(); 
1993/0526    
	qunlock(&q->rlock); 
	return n; 
} 
 
1993/0528    
static int 
qnotfull(void *a) 
1993/0526    
{ 
1993/0528    
	Queue *q = a; 
1993/0526    
 
1993/0528    
	return q->len < q->limit; 
} 
1993/0527    
 
1993/0528    
/* 
 *  write to a queue.  if no reader blocks are posted 
 *  queue the data. 
 */ 
long 
1993/0601    
qwrite(Queue *q, void *vp, int len, int nowait) 
1993/0528    
{ 
	int x, dowakeup; 
	Block *b; 
1993/0601    
	uchar *p = vp; 
1993/0526    
 
1993/0528    
	b = allocb(len); 
	memmove(b->wp, p, len); 
	b->wp += len; 
1993/0526    
 
1993/0528    
	/* flow control */ 
	while(!qnotfull(q)){ 
1993/0601    
		if(nowait) 
			return len; 
1993/0528    
		qlock(&q->wlock); 
1993/1227    
		if(waserror()) { 
			qunlock(&q->wlock); 
			nexterror(); 
		} 
1993/0528    
		q->state |= Qflow; 
		sleep(&q->wr, qnotfull, q); 
		qunlock(&q->wlock); 
1993/1227    
		poperror(); 
1993/0526    
	} 
 
1993/0528    
	x = splhi(); 
	lock(q); 
 
	if(q->state & Qclosed){ 
1993/0527    
		unlock(q); 
		splx(x); 
1993/0528    
		error(Ehungup); 
1993/0527    
	} 
1993/0526    
 
1993/0601    
	if(q->bfirst) 
		q->blast->next = b; 
	else 
		q->bfirst = b; 
	q->blast = b; 
1993/0526    
	q->len += len; 
1993/0528    
 
	if(q->state & Qstarve){ 
1993/0526    
		q->state &= ~Qstarve; 
1993/0528    
		dowakeup = 1; 
	} else 
		dowakeup = 0; 
 
1993/0526    
	unlock(q); 
	splx(x); 
 
1993/0601    
	if(dowakeup){ 
		if(q->kick) 
			(*q->kick)(q->arg); 
1993/0528    
		wakeup(&q->rr); 
1993/0601    
	} 
1993/0528    
 
1993/0526    
	return len; 
} 
 
1993/0528    
/* 
 *  Mark a queue as closed.  No further IO is permitted. 
 *  All blocks are released. 
 */ 
void 
qclose(Queue *q) 
1993/0527    
{ 
1993/0528    
	int x; 
	Block *b, *bfirst; 
1993/0527    
 
1993/0528    
	/* mark it */ 
	x = splhi(); 
	lock(q); 
	q->state |= Qclosed; 
	bfirst = q->bfirst; 
	q->bfirst = 0; 
	unlock(q); 
	splx(x); 
1993/0527    
 
1993/0528    
	/* free queued blocks */ 
1993/0804    
	while(bfirst){ 
		b = bfirst->next; 
1993/0811    
		poison(bfirst); 
1993/0804    
		free(bfirst); 
		bfirst = b; 
1993/0526    
	} 
 
1993/0528    
	/* wake up readers/writers */ 
	wakeup(&q->rr); 
	wakeup(&q->wr); 
} 
1993/0527    
 
1993/0528    
/* 
 *  Mark a queue as closed.  Wakeup any readers.  Don't remove queued 
 *  blocks. 
 */ 
void 
qhangup(Queue *q) 
{ 
	int x; 
1993/0526    
 
1993/0528    
	/* mark it */ 
	x = splhi(); 
	lock(q); 
	q->state |= Qclosed; 
	unlock(q); 
	splx(x); 
1993/0527    
 
1993/0528    
	/* wake up readers/writers */ 
	wakeup(&q->rr); 
	wakeup(&q->wr); 
} 
 
/* 
 *  mark a queue as no longer hung up 
 */ 
void 
qreopen(Queue *q) 
{ 
	q->state &= ~Qclosed; 
1993/0601    
	q->state |= Qstarve; 
1993/0908    
	q->eof = 0; 
1993/0530    
} 
 
/* 
 *  return bytes queued 
 */ 
int 
qlen(Queue *q) 
{ 
	return q->len; 
1993/0601    
} 
 
/* 
 *  return true if we can read without blocking 
 */ 
int 
qcanread(Queue *q) 
{ 
	return q->bfirst!=0; 
1993/0526    
} 


source code copyright © 1990-2005 Lucent Technologies; see license
Plan 9 distribution
comments to russ cox (rsc@swtch.com)