plan 9 kernel history: overview | file list | diff list

1990/0617/port/devpipe.c (diff list | history)

port/devpipe.c on 1990/0227
1990/0227    
#include	"u.h" 
#include	"lib.h" 
#include	"mem.h" 
#include	"dat.h" 
#include	"fns.h" 
#include	"errno.h" 
 
#include	"devtab.h" 
#include	"fcall.h" 
 
static void pipeiput(Queue*, Block*); 
static void pipeoput(Queue*, Block*); 
static void pipestclose(Queue *); 
1990/0403    
Qinfo pipeinfo = { pipeiput, pipeoput, 0, pipestclose, "pipe" }; 
1990/0227    
 
void 
pipeinit(void) 
{ 
} 
 
void 
pipereset(void) 
{ 
} 
 
/* 
 *  allocate both streams 
 * 
 *  a subsequent clone will get them the second stream 
 */ 
Chan* 
pipeattach(char *spec) 
{ 
	Chan *c; 
	/* 
	 *  make the first stream 
	 */ 
	c = devattach('|', spec); 
	c->qid = STREAMQID(0, Sdataqid); 
	streamnew(c, &pipeinfo); 
	return c; 
} 
 
Chan* 
pipeclone(Chan *c, Chan *nc) 
{ 
	/* 
	 *  make the second stream  
	 */ 
	nc = devclone(c, nc); 
	if(waserror()){ 
		close(nc); 
		nexterror(); 
	} 
	nc->qid = STREAMQID(1, Sdataqid); 
	streamnew(nc, &pipeinfo); 
	poperror(); 
 
	/* 
	 *  attach it to the first 
	 */ 
	c->stream->devq->other->next = nc->stream->devq; 
	nc->stream->devq->other->next = c->stream->devq; 
	return nc; 
} 
 
int 
pipewalk(Chan *c, char *name) 
{ 
	print("pipewalk\n"); 
	error(0, Egreg); 
} 
 
void 
pipestat(Chan *c, char *db) 
{ 
	Dir dir; 
 
	devdir(c, c->qid, "pipe", 0, 0, &dir); 
	convD2M(&dir, db); 
} 
 
Chan * 
pipeopen(Chan *c, int omode) 
{ 
	c->mode = omode; 
	c->flag |= COPEN; 
	c->offset = 0; 
	return c; 
} 
 
void 
pipecreate(Chan *c, char *name, int omode, ulong perm) 
{ 
	error(0, Egreg); 
} 
 
void 
piperemove(Chan *c) 
{ 
	error(0, Egreg); 
} 
 
void 
pipewstat(Chan *c, char *db) 
{ 
1990/0617    
	error(0, Eperm); 
1990/0227    
} 
 
void 
pipeclose(Chan *c) 
{ 
	streamclose(c); 
} 
 
long 
piperead(Chan *c, void *va, long n) 
{ 
	return streamread(c, va, n); 
} 
 
long 
pipewrite(Chan *c, void *va, long n) 
{ 
	if(waserror()){ 
		postnote(u->p, 1, "sys: write on closed pipe", NExit); 
		error(0, Egreg); 
	} 
1990/0513    
	n = streamwrite(c, va, n, 0); 
	poperror(); 
	return n; 
1990/0227    
} 
 
void 
pipeuserstr(Error *e, char *buf) 
{ 
	consuserstr(e, buf); 
} 
 
void 
pipeerrstr(Error *e, char *buf) 
{ 
	rooterrstr(e, buf); 
} 
 
/* 
 *  stream stuff 
 */ 
/* 
 *  send a block up stream to the process. 
 *  sleep untill there's room upstream. 
 */ 
static void 
pipeiput(Queue *q, Block *bp) 
{ 
1990/0403    
	FLOWCTL(q); 
1990/0227    
	PUTNEXT(q, bp); 
} 
 
/* 
 *  send the block to the other side without letting the connection 
 *  disappear in mid put. 
 */ 
static void 
pipeoput(Queue *q, Block *bp) 
{ 
	lock(q); 
	if(q->next) 
		pipeiput(q->next, bp); 
1990/0403    
	else{ 
		print("pipeoput losing block\n"); 
		freeb(bp); 
	} 
1990/0227    
	unlock(q); 
} 
 
/* 
 *  send a hangup and disconnect the streams 
 */ 
static void 
pipestclose(Queue *q) 
{ 
	Block *bp; 
 
	/* 
	 *  point to the bit-bucket and let any in-progress 
	 *  write's finish. 
	 */ 
	q->put = nullput; 
	wakeup(&q->r); 
 
	/* 
	 *  send a hangup 
	 */ 
	q = q->other; 
	lock(q); 
	if(q->next){ 
		bp = allocb(0); 
		bp->type = M_HANGUP; 
		pipeiput(q->next, bp); 
	} 
	unlock(q); 
 
	/* 
	 *  disconnect (possible livelock?) 
	 */ 
	for(;;){ 
		lock(q); 
		if(q->next){ 
			if(!canlock(q->next->other)){ 
				unlock(q); 
				continue; 
			} 
			q->next->other->next = 0; 
			unlock(q->next->other); 
			q->next = 0; 
		} 
		unlock(q); 
		break; 
	} 
} 


source code copyright © 1990-2005 Lucent Technologies; see license
Plan 9 distribution
comments to russ cox (rsc@swtch.com)