plan 9 kernel history: overview | file list | diff list

1992/0111/port/devmux.c (diff list | history)

port/devmux.c on 1991/1114
1991/1114    
#include	"u.h" 
#include	"lib.h" 
#include	"mem.h" 
#include	"dat.h" 
#include	"fns.h" 
1992/0111    
#include	"../port/error.h" 
1991/1114    
#include	"fcall.h" 
 
#include	"devtab.h" 
 
typedef struct Mux Mux; 
typedef struct Con Con; 
typedef struct Dtq Dtq; 
 
enum 
{ 
	Qdir	= 0, 
1991/1115    
	Qhead	= 0, 
1991/1114    
	Qclone, 
1991/1115    
	Qoffset, 
1991/1114    
}; 
 
enum 
{ 
1991/1117    
	Nmux	=	32, 
1991/1115    
	Maxmsg	=	(32*1024), 
	Flowctl	=	Maxmsg/2, 
1991/1114    
}; 
 
struct Dtq 
{ 
	QLock	rd; 
	Rendez	r; 
	Lock	listlk; 
	Block	*list; 
	int	ndelim; 
1991/1115    
	int	nb; 
	QLock	flow; 
	Rendez	flowr; 
1991/1114    
}; 
 
struct Con 
{ 
	int	ref; 
	char	user[NAMELEN]; 
	ulong	perm; 
	Dtq	conq; 
}; 
 
struct Mux 
{ 
	Ref; 
1991/1117    
	int	srv; 
1991/1114    
	char	name[NAMELEN]; 
	char	user[NAMELEN]; 
	ulong	perm; 
	int	headopen; 
	Dtq	headq; 
	Con	connects[Nmux]; 
1991/1117    
	Chan	*c; 
1991/1114    
}; 
 
Mux	*muxes; 
ulong	muxreadq(Mux *m, Dtq*, char*, ulong); 
void	muxwriteq(Dtq*, char*, long, int, int); 
1991/1115    
void	muxflow(Dtq*); 
Block  *muxclq(Dtq *q); 
1991/1114    
 
#define NMUX(c)		(((c->qid.path>>8)&0xffff)-1) 
1991/1115    
#define NQID(m, c)	(Qid){(m+1)<<8|(c)&0xff, 0} 
#define DQID(m)		(Qid){(m+1)<<8|CHDIR, 0} 
1991/1114    
#define NCON(c)		(c->qid.path&0xff) 
 
int 
muxgen(Chan *c, Dirtab *tab, int ntab, int s, Dir *dp) 
{ 
	Mux *m; 
	int mux; 
	Con *cm; 
	char buf[10]; 
1991/1115    
	int nq; 
1991/1114    
 
	if(c->qid.path == CHDIR) { 
		if(s >= conf.nmux) 
			return -1; 
 
		m = &muxes[s]; 
		if(m->name[0] == '\0') 
			return 0; 
1991/1117    
		if(m->srv) 
			devdir(c, NQID(s, 0), m->name, 0, m->user, m->perm, dp); 
		else 
			devdir(c, DQID(s), m->name, 0, m->user, m->perm, dp); 
1991/1114    
		return 1; 
	} 
 
	if(s >= Nmux+2) 
		return -1; 
 
	mux = NMUX(c); 
	m = &muxes[mux]; 
1991/1115    
 
1991/1114    
	switch(s) { 
	case Qhead: 
1991/1115    
		devdir(c, NQID(mux, Qhead), "head", m->headq.nb, m->user, m->perm, dp); 
1991/1114    
		break; 
	case Qclone: 
		devdir(c, NQID(mux, Qclone), "clone", 0, m->user, m->perm, dp); 
		break; 
	default: 
1991/1115    
		nq = s-Qoffset; 
		cm = &m->connects[nq]; 
1991/1114    
		if(cm->ref == 0) 
			return 0; 
1991/1115    
		sprint(buf, "%d", nq); 
		devdir(c, NQID(mux, Qoffset+s), buf, cm->conq.nb, cm->user, cm->perm, dp); 
1991/1114    
		break; 
	} 
	return 1; 
} 
 
void 
muxinit(void) 
{ 
} 
 
void 
muxreset(void) 
{ 
	muxes = ialloc(conf.nmux*sizeof(Mux), 0); 
} 
 
Chan * 
muxattach(char *spec) 
{ 
	Chan *c; 
 
	c = devattach('m', spec); 
	c->qid.path = CHDIR|Qdir; 
	return c; 
} 
 
Chan * 
muxclone(Chan *c, Chan *nc) 
{ 
	int ncon; 
	Mux *m; 
 
	if(c->qid.path == CHDIR) 
		return devclone(c, nc);; 
 
	m = &muxes[NMUX(c)]; 
	ncon = NCON(c); 
	c = devclone(c, nc); 
1991/1115    
 
	if((c->flag&COPEN) == 0) 
		return c; 
 
1991/1114    
	switch(ncon) { 
	case Qhead: 
		incref(m); 
		break; 
	case Qclone: 
		break; 
	default: 
		lock(m); 
1991/1115    
		m->connects[ncon-Qoffset].ref++; 
1991/1114    
		m->ref++; 
		unlock(m); 
	} 
	return c; 
} 
 
int 
muxwalk(Chan *c, char *name) 
{ 
	if(strcmp(name, "..") == 0) { 
		c->qid.path = CHDIR|Qdir; 
		return 1; 
	} 
 
	return devwalk(c, name, 0, 0, muxgen); 
} 
 
void 
muxstat(Chan *c, char *db) 
{ 
	devstat(c, db, 0, 0, muxgen); 
} 
 
Chan * 
muxopen(Chan *c, int omode) 
{ 
	Mux *m; 
	Con *cm, *e; 
1991/1115    
	int mux, ok; 
1991/1117    
	Chan *new; 
1991/1114    
 
1991/1117    
	c = devopen(c, omode, 0, 0, muxgen); 
1991/1114    
	if(c->qid.path & CHDIR) 
1991/1117    
		return c; 
1991/1114    
 
1991/1115    
	mux = NMUX(c); 
	m = &muxes[mux]; 
1991/1117    
	lock(m); 
	if(waserror()) { 
		c->flag &= ~COPEN; 
		unlock(m); 
		nexterror(); 
	} 
	if(m->srv) { 
		if(m->c == 0) 
			error(Eshutdown); 
		new = m->c; 
		incref(new); 
		unlock(m); 
		poperror(); 
		close(c); 
		return new; 
	} 
1991/1114    
	switch(NCON(c)) { 
	case Qhead: 
		if(m->headopen) 
			errors("server channel busy"); 
1991/1117    
		m->headopen = 1; 
		m->ref++; 
1991/1114    
		break; 
	case Qclone: 
		if(m->headopen == 0) 
			errors("server shutdown"); 
 
		cm = m->connects; 
		for(e = &cm[Nmux]; cm < e; cm++) 
			if(cm->ref == 0) 
				break; 
1991/1117    
		if(cm == e) 
1991/1114    
			errors("all cannels busy"); 
1991/1115    
		cm->ref = 1; 
1991/1114    
		m->ref++; 
		strncpy(cm->user, u->p->user, NAMELEN); 
		cm->perm = 0600; 
1991/1115    
		c->qid = NQID(mux, (cm-m->connects)+Qoffset); 
1991/1114    
		break; 
	default: 
1991/1115    
		cm = &m->connects[NCON(c)-Qoffset]; 
1991/1114    
		cm->ref++; 
1991/1115    
		m->ref++; 
1991/1114    
		break; 
	} 
1991/1117    
	unlock(m); 
	poperror(); 
1991/1114    
	return c; 
} 
 
void 
muxcreate(Chan *c, char *name, int omode, ulong perm) 
{ 
	int n; 
	Mux *m, *e; 
 
	if(c->qid.path != CHDIR) 
		error(Eperm); 
 
	omode = openmode(omode); 
 
	m = muxes; 
	for(e = &m[conf.nmux]; m < e; m++) { 
1991/1115    
		if(m->name[0] == '\0' && m->ref == 0 && canlock(m)) { 
1991/1114    
			if(m->ref != 0) { 
				unlock(m); 
				continue; 
			} 
			break; 
		}	 
	} 
 
	if(m == e) 
		errors("no multiplexors"); 
 
	strncpy(m->name, name, NAMELEN); 
	strncpy(m->user, u->p->user, NAMELEN); 
	m->perm = perm&~CHDIR; 
1991/1117    
	m->srv = 1; 
	if(perm&CHDIR) 
		m->srv = 0; 
1991/1114    
	unlock(m); 
 
	n = m - muxes; 
1991/1117    
	c->qid = (Qid){(CHDIR&perm)|(n+1)<<8, 0}; 
1991/1114    
	c->flag |= COPEN; 
	c->mode = omode; 
} 
 
void 
muxremove(Chan *c) 
{ 
	Mux *m; 
1991/1117    
	Chan *srv; 
1991/1114    
 
1991/1117    
	if(c->qid.path == CHDIR)  
1991/1114    
		error(Eperm); 
 
	m = &muxes[NMUX(c)]; 
1991/1117    
	if((c->qid.path&CHDIR) == 0 && m->srv == 0) 
		error(Eperm); 
		 
1991/1114    
	if(strcmp(u->p->user, m->user) != 0) 
		errors("not owner"); 
 
1991/1117    
	srv = 0; 
	lock(m); 
	if(m->srv) { 
		srv = m->c; 
		m->c = 0; 
	} 
1991/1114    
	m->name[0] = '\0'; 
1991/1117    
	unlock(m); 
	if(srv) 
		close(srv); 
 
	muxclose(c); 
1991/1114    
} 
 
void 
muxwstat(Chan *c, char *db) 
{ 
	Mux *m; 
	Dir d; 
	int nc; 
 
	if(c->qid.path == CHDIR) 
		error(Eperm); 
 
	m = &muxes[NMUX(c)]; 
	if(strcmp(u->p->user, m->user) != 0) 
		errors("not owner"); 
 
	convM2D(db, &d); 
	d.mode &= 0777; 
1991/1117    
	if(c->qid.path&CHDIR || m->srv) { 
1991/1114    
		strcpy(m->name, d.name); 
		m->perm = d.mode; 
		return; 
	} 
	nc = NCON(c); 
	switch(nc) { 
	case Qclone: 
		error(Eperm); 
	case Qhead: 
		m->perm = d.mode; 
		break; 
	default: 
1991/1115    
		m->connects[nc-Qoffset].perm = d.mode; 
1991/1114    
		break; 
	} 
} 
 
void 
muxclose(Chan *c) 
{ 
	Block *f1, *f2; 
	Con *cm, *e; 
	Mux *m; 
	int nc; 
 
1991/1115    
	if(c->qid.path&CHDIR) 
1991/1114    
		return; 
 
1991/1117    
	m = &muxes[NMUX(c)]; 
	if(!(c->flag&COPEN) || m->srv) 
1991/1115    
		return; 
 
1991/1114    
	nc = NCON(c); 
	f1 = 0; 
	f2 = 0; 
	switch(nc) { 
	case Qhead: 
		m->headopen = 0; 
		cm = m->connects; 
		for(e = &cm[Nmux]; cm < e; cm++) 
			if(cm->ref) 
				wakeup(&cm->conq.r); 
		lock(m); 
1991/1115    
		if(--m->ref == 0) 
			f1 = muxclq(&m->headq); 
1991/1114    
		unlock(m); 
		break; 
	case Qclone: 
1991/1115    
		break; 
1991/1114    
	default: 
		lock(m); 
1991/1115    
		cm = &m->connects[nc-Qoffset]; 
		if(--cm->ref == 0) 
			f1 = muxclq(&cm->conq); 
		if(--m->ref == 0) 
			f1 = muxclq(&m->headq); 
1991/1114    
		unlock(m); 
	} 
	if(f1) 
		freeb(f1); 
	if(f2) 
		freeb(f2); 
} 
 
long 
muxread(Chan *c, void *va, long n, ulong offset) 
{ 
	Mux *m; 
	Con *cm; 
	int bread; 
 
	if(c->qid.path & CHDIR) 
		return devdirread(c, va, n, 0, 0, muxgen); 
 
	m = &muxes[NMUX(c)]; 
	switch(NCON(c)) { 
	case Qhead: 
		bread = muxreadq(m, &m->headq, va, n); 
		break; 
	case Qclone: 
		error(Eperm); 
	default: 
1991/1115    
		cm = &m->connects[NCON(c)-Qoffset]; 
1991/1114    
		bread = muxreadq(m, &cm->conq, va, n); 
		break; 
	} 
 
	return bread; 
} 
 
Con * 
muxhdr(Mux *m, char *h) 
{ 
	Con *c; 
 
1991/1115    
	if(h[0] != Tmux || h[2] != 0) 
1991/1114    
		error(Ebadmsg); 
 
	c = &m->connects[h[1]]; 
	if(c < m->connects || c > &m->connects[Nmux])	 
		error(Ebadmsg); 
 
	if(c->ref == 0) 
		return 0; 
 
	return c; 
} 
 
long 
muxwrite(Chan *c, void *va, long n, ulong offset) 
{ 
	Mux *m; 
	Con *cm; 
1991/1117    
	int muxid, fd; 
1991/1114    
	Block *f, *bp; 
1991/1117    
	char *a, hdr[3], buf[10]; 
1991/1114    
 
1991/1115    
	if(c->qid.path&CHDIR) 
1991/1114    
		error(Eisdir); 
 
1991/1117    
	m = &muxes[NMUX(c)]; 
	if(n > Maxmsg || (m->srv && n >= sizeof(buf))) 
1991/1115    
		error(Etoobig); 
 
1991/1117    
	if(m->srv) { 
		memmove(buf, va, n);		/* so we can NUL-terminate */ 
		buf[n] = 0; 
		fd = strtoul(buf, 0, 0); 
		fdtochan(fd, -1, 0);		/* error check */ 
		m->c = u->p->fgrp->fd[fd]; 
		incref(m->c); 
		return n; 
	} 
1991/1114    
	switch(NCON(c)) { 
	case Qclone: 
		error(Eperm); 
	case Qhead: 
		if(n < 2) 
			error(Ebadmsg); 
 
		a = (char*)va; 
		memmove(hdr, a, sizeof(hdr)); 
		cm = muxhdr(m, hdr); 
		if(cm == 0) 
			error(Ehungup); 
 
		muxwriteq(&cm->conq, a+sizeof(hdr), n-sizeof(hdr), 0, 0); 
		break; 
	default: 
		if(m->headopen == 0) 
			error(Ehungup); 
 
1991/1115    
		muxid = NCON(c)-Qoffset; 
1991/1114    
		muxwriteq(&m->headq, va, n, 1, muxid); 
		break; 
	} 
 
	return n; 
} 
 
void 
muxwriteq(Dtq *q, char *va, long n, int addid, int muxid) 
{ 
	Block *head, *tail, *bp; 
1991/1115    
	ulong l, bwrite; 
1991/1114    
 
	head = 0; 
1991/1117    
	tail = 0; 
1991/1114    
	if(waserror()) { 
		if(head) 
			freeb(head); 
		nexterror(); 
	} 
 
1991/1115    
	bwrite = 0; 
1991/1114    
	while(n) { 
1991/1115    
		if(addid) { 
			bp = allocb(n+3); 
			bp->wptr[0] = Tmux; 
			bp->wptr[1] = muxid; 
			bp->wptr[2] = 0; 
			bp->wptr += 3; 
			bwrite += 3; 
			addid = 0; 
		} 
		else 
			bp = allocb(n); 
1991/1114    
		l = bp->lim - bp->wptr; 
1991/1115    
		if(l > n) 
			l = n; 
1991/1114    
		memmove(bp->wptr, va, l);	/* Interruptable thru fault */ 
		va += l; 
		bp->wptr += l; 
1991/1115    
		bwrite += l; 
1991/1114    
		n -= l; 
		if(head == 0) 
			head = bp; 
		else 
			tail->next = bp; 
		tail = bp; 
	} 
	poperror(); 
	tail->flags |= S_DELIM; 
1991/1115    
 
	if(q->nb > Flowctl) 
		muxflow(q); 
 
1991/1114    
	lock(&q->listlk); 
1991/1115    
	if(q->list == 0) 
		q->list = head; 
	else { 
		for(tail = q->list; tail->next; tail = tail->next) 
			; 
		tail->next = head; 
	} 
1991/1114    
	q->ndelim++; 
1991/1115    
	q->nb += bwrite; 
1991/1114    
	unlock(&q->listlk); 
1991/1115    
	wakeup(&q->r); 
1991/1114    
} 
 
int 
1991/1115    
muxflw(Dtq *q) 
1991/1114    
{ 
1991/1115    
	return q->nb < Flowctl; 
} 
 
void 
muxflow(Dtq *q) 
{ 
	qlock(&q->flow); 
	if(waserror()) { 
		qunlock(&q->flow); 
		nexterror(); 
	} 
	sleep(&q->flowr, muxflw, q); 
	poperror(); 
	qunlock(&q->flow); 
} 
 
int 
havedata(Dtq *q) 
{ 
1991/1114    
	int n; 
 
	lock(&q->listlk); 
	n = q->ndelim; 
	unlock(&q->listlk); 
	return n; 
} 
 
ulong 
muxreadq(Mux *m, Dtq *q, char *va, ulong n) 
{ 
	int l, nread, gotdelim; 
1991/1115    
	Block *bp, *f1; 
1991/1114    
 
	qlock(&q->rd); 
	bp = 0; 
	if(waserror()) { 
		lock(&q->listlk); 
		if(bp) { 
			bp->next = q->list; 
			q->list = bp; 
		} 
		unlock(&q->listlk); 
1991/1117    
		qunlock(&q->rd); 
1991/1114    
		nexterror(); 
	} 
1991/1115    
	while(!havedata(q)) { 
		sleep(&q->r, havedata, q); 
		if(m->headopen == 0) 
			errors("server shutdown"); 
	} 
1991/1114    
 
	nread = 0; 
1991/1115    
	f1 = 0; 
	lock(&q->listlk); 
1991/1114    
	while(n) { 
		bp = q->list; 
		q->list = bp->next; 
		bp->next = 0; 
		unlock(&q->listlk); 
1991/1115    
		if(f1) { 
			freeb(f1); 
			f1 = 0; 
		} 
1991/1114    
		l = BLEN(bp); 
1991/1115    
		if(l > n) 
			l = n; 
1991/1114    
		memmove(va, bp->rptr, l);	/* Interruptable thru fault */ 
		va += l; 
		bp->rptr += l; 
		n -= l; 
1991/1115    
		nread += l; 
1991/1114    
		lock(&q->listlk); 
1991/1115    
		if(bp->rptr == bp->wptr) 
			f1 = bp; 
		else { 
1991/1114    
			bp->next = q->list; 
			q->list = bp; 
		} 
1991/1115    
		if(bp->flags&S_DELIM) { 
1991/1114    
			q->ndelim--; 
			break; 
1991/1115    
		} 
1991/1114    
	} 
1991/1115    
	q->nb -= nread; 
	unlock(&q->listlk); 
	if(f1) 
		freeb(f1); 
1991/1114    
	qunlock(&q->rd); 
1991/1115    
	poperror(); 
	if(q->nb < Flowctl) 
		wakeup(&q->flowr); 
1991/1114    
	return nread; 
1991/1115    
} 
 
Block * 
muxclq(Dtq *q) 
{ 
	Block *f; 
 
	f = q->list; 
	q->list = 0; 
	q->nb = 0; 
	q->ndelim = 0; 
	return f; 
1991/1114    
} 


source code copyright © 1990-2005 Lucent Technologies; see license
Plan 9 distribution
comments to russ cox (rsc@swtch.com)