plan 9 kernel history: overview | file list | diff list

1993/0528/port/devpipe.c (diff list | history)

1993/0501/sys/src/9/port/devpipe.c:6,191993/0528/sys/src/9/port/devpipe.c:6,22 (short | long | prev | next)
1992/0111    
#include	"../port/error.h" 
1990/0227    
 
#include	"devtab.h" 
1993/0528    
#include	"netif.h" 
1990/0227    
 
1990/1009    
typedef struct Pipe	Pipe; 
struct Pipe 
{ 
	Ref; 
1991/1227    
	QLock; 
1990/1009    
	Pipe	*next; 
1993/0528    
	int	ref; 
1992/0621    
	ulong	path; 
1993/0528    
	Queue	*q[2]; 
	int	qref[2]; 
1990/1009    
}; 
 
1992/0621    
struct 
1993/0501/sys/src/9/port/devpipe.c:23,481993/0528/sys/src/9/port/devpipe.c:26,42
1992/0621    
	ulong	path; 
1990/1009    
} pipealloc; 
 
1992/0621    
static Pipe *getpipe(ulong); 
1990/0227    
static void pipeiput(Queue*, Block*); 
static void pipeoput(Queue*, Block*); 
static void pipestclose(Queue *); 
1992/0621    
                 
1990/1113    
Qinfo pipeinfo = 
1993/0528    
enum 
1990/1113    
{ 
	pipeiput, 
	pipeoput, 
	0, 
	pipestclose, 
	"pipe" 
1993/0528    
	Qdir, 
	Qdata0, 
	Qdata1, 
1990/1113    
}; 
1990/0227    
 
1992/0621    
Dirtab pipedir[] = 
{ 
1990/11211    
	"data",		{Sdataqid},	0,			0600, 
	"ctl",		{Sctlqid},	0,			0600, 
	"data1",	{Sdataqid},	0,			0600, 
	"ctl1",		{Sctlqid},	0,			0600, 
1993/0528    
	"data",		{Qdata0},	0,			0600, 
	"data1",	{Qdata1},	0,			0600, 
1990/1009    
}; 
#define NPIPEDIR 4 
 
1993/0501/sys/src/9/port/devpipe.c:66,741993/0528/sys/src/9/port/devpipe.c:60,84
1990/0227    
	Chan *c; 
1990/0629    
 
1990/0227    
	c = devattach('|', spec); 
1992/0621    
	p = smalloc(sizeof(Pipe)); 
1993/0528    
	p = malloc(sizeof(Pipe)); 
	if(p == 0) 
		exhausted("memory"); 
1992/0621    
	p->ref = 1; 
1990/1009    
 
1993/0528    
	p->q[0] = qopen(64*1024, 0, 0); 
	if(p->q[0] == 0){ 
		free(p); 
		exhausted("memory"); 
	} 
	p->q[0]->state &= ~Qmsg; 
	p->q[1] = qopen(32*1024, 0, 0); 
	if(p->q[1] == 0){ 
		free(p->q[0]); 
		free(p); 
		exhausted("memory"); 
	} 
	p->q[1]->state &= ~Qmsg; 
 
1990/1009    
	lock(&pipealloc); 
1992/0621    
	p->path = ++pipealloc.path; 
	p->next = pipealloc.pipe; 
1993/0501/sys/src/9/port/devpipe.c:75,811993/0528/sys/src/9/port/devpipe.c:85,92
1992/0621    
	pipealloc.pipe = p; 
1990/1009    
	unlock(&pipealloc); 
 
1992/0621    
	c->qid = (Qid){CHDIR|STREAMQID(2*p->path, 0), 0}; 
1993/0528    
	c->qid = (Qid){CHDIR|NETQID(2*p->path, Qdir), 0}; 
	c->aux = p; 
1991/1227    
	c->dev = 0; 
1990/0227    
	return c; 
} 
1993/0501/sys/src/9/port/devpipe.c:85,941993/0528/sys/src/9/port/devpipe.c:96,106
1990/0227    
{ 
1990/1009    
	Pipe *p; 
 
1992/0621    
	p = getpipe(STREAMID(c->qid.path)/2); 
1993/0528    
	p = c->aux; 
1990/0227    
	nc = devclone(c, nc); 
1990/1013    
	if(incref(p) <= 1) 
		panic("pipeclone"); 
1993/0528    
	qlock(p); 
	p->ref++; 
	qunlock(p); 
1990/1009    
	return nc; 
} 
1990/0227    
 
1993/0501/sys/src/9/port/devpipe.c:97,1091993/0528/sys/src/9/port/devpipe.c:109,121
1990/1009    
{ 
	int id; 
1990/0629    
 
1990/11211    
	id = STREAMID(c->qid.path); 
1993/0528    
	id = NETID(c->qid.path); 
1990/1009    
	if(i > 1) 
		id++; 
	if(tab==0 || i>=ntab) 
		return -1; 
	tab += i; 
1991/1109    
	devdir(c, (Qid){STREAMQID(id, tab->qid.path),0}, tab->name, tab->length, eve, tab->perm, dp); 
1993/0528    
	devdir(c, (Qid){NETQID(id, tab->qid.path),0}, tab->name, tab->length, eve, tab->perm, dp); 
1990/1009    
	return 1; 
1990/0227    
} 
 
1993/0501/sys/src/9/port/devpipe.c:117,1231993/0528/sys/src/9/port/devpipe.c:129,153
1990/0227    
void 
pipestat(Chan *c, char *db) 
{ 
1992/0826    
	streamstat(c, db, "pipe", 0666); 
1993/0528    
	Pipe *p; 
	Dir dir; 
 
	p = c->aux; 
 
	switch(NETTYPE(c->qid.path)){ 
	case Qdir: 
		devdir(c, c->qid, ".", 2*DIRLEN, eve, CHDIR|0555, &dir); 
		break; 
	case Qdata0: 
		devdir(c, c->qid, "data", p->q[0]->len, eve, 0660, &dir); 
		break; 
	case Qdata1: 
		devdir(c, c->qid, "data1", p->q[1]->len, eve, 0660, &dir); 
		break; 
	default: 
		panic("pipestat"); 
	} 
	convD2M(&dir, db); 
1990/0227    
} 
 
1990/1009    
/* 
1993/0501/sys/src/9/port/devpipe.c:127,1341993/0528/sys/src/9/port/devpipe.c:157,162
1990/0227    
pipeopen(Chan *c, int omode) 
{ 
1990/1009    
	Pipe *p; 
1992/0621    
	int other; 
1990/1009    
	Stream *local, *remote; 
 
1990/11211    
	if(c->qid.path & CHDIR){ 
1990/1009    
		if(omode != OREAD) 
1993/0501/sys/src/9/port/devpipe.c:139,1741993/0528/sys/src/9/port/devpipe.c:167,183
1990/1009    
		return c; 
	} 
 
1992/0621    
	p = getpipe(STREAMID(c->qid.path)/2); 
1990/1009    
	if(waserror()){ 
1991/1227    
		qunlock(p); 
1990/1009    
		nexterror(); 
	} 
1993/0528    
	p = c->aux; 
1991/1227    
	qlock(p); 
1990/1009    
	streamopen(c, &pipeinfo); 
	local = c->stream; 
	if(local->devq->ptr == 0){ 
		/* 
1991/1227    
		 *  first open, create the other end also 
1990/1009    
		 */ 
1992/0621    
		other = STREAMID(c->qid.path)^1; 
		remote = streamnew(c->type, c->dev, other, &pipeinfo,1); 
1990/1009    
                 
		/* 
		 *  connect the device ends of both streams 
		 */ 
		local->devq->ptr = remote; 
		remote->devq->ptr = local; 
		local->devq->other->next = remote->devq; 
		remote->devq->other->next = local->devq; 
1992/0623    
	} else if(local->opens == 1){ 
1990/1009    
		/* 
1991/1227    
		 *  keep other side around till last close of this side 
1990/1009    
		 */ 
1991/1227    
		streamenter(local->devq->ptr); 
1993/0528    
	switch(NETTYPE(c->qid.path)){ 
	case Qdata0: 
		p->qref[0]++; 
		break; 
	case Qdata1: 
		p->qref[1]++; 
		break; 
1990/1009    
	} 
1991/1227    
	qunlock(p); 
1990/1009    
	poperror(); 
 
1993/0501    
	c->mode = omode&~OTRUNC; 
1990/0227    
	c->flag |= COPEN; 
1993/0501/sys/src/9/port/devpipe.c:201,2271993/0528/sys/src/9/port/devpipe.c:210,253
1990/0227    
pipeclose(Chan *c) 
{ 
1992/0621    
	Pipe *p, *f, **l; 
1990/11211    
	Stream *remote; 
1990/0629    
 
1992/0621    
	p = getpipe(STREAMID(c->qid.path)/2); 
1993/0528    
	p = c->aux; 
	qlock(p); 
1990/0629    
 
1990/1009    
	/* 
1991/1227    
	 *  take care of local and remote streams 
1993/0528    
	 *  closing either side hangs up the stream 
1990/1009    
	 */ 
1990/11211    
	if(c->stream){ 
1991/1227    
		qlock(p); 
1990/11211    
		remote = c->stream->devq->ptr; 
1991/1227    
		if(streamclose(c) == 0){ 
1991/0314    
			if(remote) 
1992/0711    
				streamexit(remote); 
1993/0528    
	switch(NETTYPE(c->qid.path)){ 
	case Qdata0: 
		p->qref[0]--; 
		if(p->qref[0] == 0){ 
			qclose(p->q[0]); 
			qhangup(p->q[1]); 
1991/0314    
		} 
1991/1227    
		qunlock(p); 
1993/0528    
		break; 
	case Qdata1: 
		p->qref[1]--; 
		if(p->qref[1] == 0){ 
			qclose(p->q[1]); 
			qhangup(p->q[0]); 
		} 
		break; 
1990/11211    
	} 
1990/11161    
 
	/* 
	 *  free the structure 
1993/0528    
	 *  if both sides are closed, they are reusable 
1990/11161    
	 */ 
	if(decref(p) == 0){ 
1993/0528    
	if(p->qref[0] == 0 && p->qref[1] == 0){ 
		qreopen(p->q[0]); 
		qreopen(p->q[1]); 
	} 
 
	/* 
	 *  free the structure on last close 
	 */ 
	p->ref--; 
	if(p->ref == 0){ 
		qunlock(p); 
1990/11161    
		lock(&pipealloc); 
1992/0621    
		l = &pipealloc.pipe; 
		for(f = *l; f; f = f->next) { 
1993/0501/sys/src/9/port/devpipe.c:232,2491993/0528/sys/src/9/port/devpipe.c:258,291
1992/0625    
			l = &f->next; 
1992/0621    
		} 
1990/11161    
		unlock(&pipealloc); 
1993/0528    
		free(p->q[0]); 
		free(p->q[1]); 
1992/0621    
		free(p); 
1990/11161    
	} 
1993/0528    
 
	qunlock(p); 
1990/0227    
} 
 
long 
1991/0411    
piperead(Chan *c, void *va, long n, ulong offset) 
1990/0227    
{ 
1993/0528    
	Pipe *p; 
 
1992/0711    
	USED(offset); 
1990/11211    
	if(c->qid.path & CHDIR) 
1990/1009    
		return devdirread(c, va, n, pipedir, NPIPEDIR, pipegen); 
1992/0621    
 
	return streamread(c, va, n); 
1993/0528    
	p = c->aux; 
 
	switch(NETTYPE(c->qid.path)){ 
	case Qdir: 
		return devdirread(c, va, n, pipedir, NPIPEDIR, pipegen); 
	case Qdata0: 
		return qread(p->q[0], va, n); 
	case Qdata1: 
		return qread(p->q[1], va, n); 
	default: 
		panic("piperead"); 
	} 
	return -1;	/* not reached */ 
1990/0227    
} 
 
1990/1009    
/* 
1993/0501/sys/src/9/port/devpipe.c:253,3311993/0528/sys/src/9/port/devpipe.c:295,313
1990/0227    
long 
1991/0411    
pipewrite(Chan *c, void *va, long n, ulong offset) 
1990/0227    
{ 
1993/0528    
	Pipe *p; 
 
1992/0711    
	USED(offset); 
1992/0821    
 
	/* avoid notes when pipe is a mounted stream */ 
	if(c->flag & CMSG) 
		return streamwrite(c, va, n, 0); 
1993/0528    
	p = c->aux; 
1992/0821    
 
1992/0621    
	if(waserror()) { 
1993/0501    
		postnote(up, 1, "sys: write on closed pipe", NUser); 
1990/11211    
		error(Egreg); 
1993/0528    
	switch(NETTYPE(c->qid.path)){ 
	case Qdata0: 
		return qwrite(p->q[1], va, n); 
	case Qdata1: 
		return qwrite(p->q[0], va, n); 
	default: 
		panic("piperead"); 
1990/0227    
	} 
1990/0513    
	n = streamwrite(c, va, n, 0); 
	poperror(); 
	return n; 
1990/0227    
} 
                 
/* 
1992/0821    
 *  send a block upstream to the process. 
 *  sleep until there's room upstream. 
1990/0227    
 */ 
static void 
pipeiput(Queue *q, Block *bp) 
{ 
1992/0305    
	FLOWCTL(q, bp); 
1990/0227    
} 
                 
/* 
1991/1227    
 *  send the block to the other side 
1990/0227    
 */ 
static void 
pipeoput(Queue *q, Block *bp) 
{ 
1990/0629    
	PUTNEXT(q, bp); 
1990/0227    
} 
                 
/* 
 *  send a hangup and disconnect the streams 
 */ 
static void 
pipestclose(Queue *q) 
{ 
	Block *bp; 
                 
	/* 
	 *  point to the bit-bucket and let any in-progress 
	 *  write's finish. 
	 */ 
	q->put = nullput; 
	wakeup(&q->r); 
                 
	/* 
	 *  send a hangup 
	 */ 
	q = q->other; 
1991/0302    
	if(q->next == 0) 
		return; 
1990/0629    
	bp = allocb(0); 
	bp->type = M_HANGUP; 
	PUTNEXT(q, bp); 
1992/0621    
} 
                 
Pipe* 
getpipe(ulong path) 
{ 
	Pipe *p; 
                 
	lock(&pipealloc); 
	for(p = pipealloc.pipe; p; p = p->next) { 
		if(path == p->path) { 
			unlock(&pipealloc); 
			return p; 
		} 
	} 
	unlock(&pipealloc); 
1992/0625    
	panic("getpipe"); 
	return 0;		/* not reached */ 
1990/0227    
} 


source code copyright © 1990-2005 Lucent Technologies; see license
Plan 9 distribution
comments to russ cox (rsc@swtch.com)