Sun Microsystems, Inc.
spacerspacer
spacer www.sun.com docs.sun.com |
spacer
black dot
 
 
10.  STREAMS Modules Module Overview Module Procedures  Previous   Contents   Next 
   
 

The steps are:

  1. Retrieve the first message from the queue using getq(9F).

  2. If the message is high priority, process it immediately and pass it along the stream.

    Otherwise, the service procedure should use canputnext(9F) to determine if the next module or driver that enqueues messages is within acceptable flow-control limits. canputnext(9F) searches the stream for the next module, driver, or the stream head with a service procedure. When it finds one, it looks at the total message space currently allocated to the queue for messages. If the amount of space currently used at that queue reaches the high-water mark, canputnext(9F) returns false (zero). If the next queue with a service procedure is within acceptable flow-control limits, canputnext(9F) returns true (nonzero).

  3. If canputnext(9F) returns false, the service procedure returns the message to its own queue with putbq(9F). The service procedure can do no further processing at this time, and it returns to the caller.

    If canputnext(9F) returns true, the service procedure completes any processing of the message. This can involve retrieving more messages from the queue, allocating and deallocating header and trailer information, and performing control functions for the module.

  4. When the service procedure is finished processing the message, it calls putnext(9F) to pass the resulting message to the next queue.

These steps are repeated until getq(9F) returns NULL (the queue is empty) or canputnext(9F) returns false.

Filter Module Example

The module shown next, crmod in Example 10-4, is an asymmetric filter. On the write side, a newline is changed to a carriage return followed by a newline. On the read side, no conversion is done.


Example 10-4 Filter Module

/* Simple filter
 * converts newline -> carriage return, newline
 */
#include <sys/types.h>
#include <sys/param.h>
#include <sys/stream.h>
#include <sys/stropts.h>
#include <sys/ddi.h>
#include <sys/sunddi.h>

static struct module_info minfo =
	{ 0x09, "crmod", 0, INFPSZ, 512, 128 };

static int modopen (queue_t*, dev_t*, int, int, cred_t*);
static int modrput (queue_t*, mblk_t*);
static int modwput (queue_t*, mblk_t*);
static int modwsrv (queue_t*);
static int modclose (queue_t*, int, cred_t*);

static struct qinit rinit = {
	modrput, NULL, modopen, modclose, NULL, &minfo, NULL};

static struct qinit winit = {
	modwput, modwsrv, NULL, NULL, NULL, &minfo, NULL};

struct streamtab crmdinfo={ &rinit, &winit, NULL, NULL};

stropts.h includes definitions of flush message options common to user applications. modrput is like modput from the null module.

In contrast to the null module example, a single module_info structure is shared by the read side and write side. The module_info includes the flow control high-water and low-water marks (512 and 128) for the write queue. (Though the same module_info is used on the read queue side, the read side has no service procedure so flow control is not used.) The qinit contains the service procedure pointer.

The write-side put procedure, the beginning of the service procedure, and an example of flushing a queue are shown in Example 10-5.


Example 10-5 Flushing a Queue

static int
modwput(queue_t *q, mblk_t *mp)
{
	if (mp->b_datap->db_type >= QPCTL && mp->b_datap->db_type != M_FLUSH)
			putnext(q, mp);
	else
			putq(q, mp);				 /* Put it on the queue */
	return (0);
}
static int 
modwsrv(queue_t *q)
{
	mblk_t *mp;

	while ((mp = getq(q)) != NULL) {
			switch (mp->b_datap->db_type) {
				default:
					if (canputnext(q)) {
							putnext(q, mp);
							break;
			 		} else {
							putbq(q, mp);
							return (0);
					 }

				case M_FLUSH:
				    if (*mp->b_rptr & FLUSHW)
						    flushq(q, FLUSHDATA);
				    putnext(q, mp);
				    break;

				case M_DATA: {
					mblk_t *nbp = NULL;
					mblk_t *next;
					if (!canputnext(q)) {
						putbq(q, mp);
						return (0);
					}
			/* Filter data, appending to queue */
			for (; mp != NULL; mp = next) {
					while (mp->b_rptr < mp->b_wptr) {
							if (*mp->b_rptr == '\n')
								if (!bappend(&nbp, '\r'))
										goto push;
							if (!bappend(&nbp, *mp->b_rptr))
								goto push;
							mp->b_rptr++;
							continue;
					push:
							if (nbp)
								putnext(q, nbp);
							nbp = NULL;
							if (!canputnext(q)) {
								if (mp->b_rptr>=mp->b_wptr){
										next = mp->b_cont;
										freeb(mp);
										mp=next;
								}
								if (mp)
										putbq(q, mp);
								return (0);
							}
					} /* while */
					next = mp->b_cont;
					freeb(mp);
					if (nbp)
						putnext(q, nbp);
				} /* for */
			}
		} /* switch */
	}
	return (0);
}					

 
 
 
  Previous   Contents   Next