Sun Microsystems, Inc.
spacerspacer
spacer www.sun.com docs.sun.com |
spacer
black dot
 
 
10.  STREAMS Modules Module Overview Filter Module Example  Previous   Contents   Next 
   
 

modwsrv() is the write service procedure. It takes a single argument, which is a pointer to the write queue. modwsrv() switches on the message type, M_FLUSH or M_DATA. modwsrv() processes only one high-priority message, M_FLUSH. No other high-priority messages should reach modwsrv. High-priority messages other than type M_FLUSH use putnext(9F) to avoid scheduling. The others are queued for the service procedure. An M_FLUSH message is a request to remove messages on one or both queues. It can be processed in the put or service procedure.

For an M_FLUSH message, modwsrv() checks the first data byte. If FLUSHW is set, the write queue is flushed by flushq(9F), which takes two arguments, the queue pointer and a flag. The flag indicates what should be flushed, data messages (FLUSHDATA) or everything (FLUSHALL). Data includes M_DATA, M_DELAY, M_PROTO, and M_PCPROTO messages. The choice of what types of messages to flush is specific to each module.

Ordinary messages are returned to the queue if canputnext(9F) returns false, indicating the downstream path is blocked.

The differences in M_DATA processing between this and the example in "Message Allocation and Freeing" relate to the manner in which the new messages are forwarded and flow controlled. For the purpose of demonstrating alternative means of processing messages, this version creates individual new messages rather than a single message containing multiple message blocks. When a new message block is full, it is immediately forwarded with putnext(9F) rather than being linked into a single large message. This alternative is not desirable because message boundaries are altered, and because of the additional overhead of handling and scheduling multiple messages.

When the filter processing is performed (following push), flow control is checked (with canputnext(9F)) after each new message is forwarded. This is necessary because there is no provision to hold the new message until the queue becomes unblocked. If the downstream path is blocked, the remaining part of the original message is returned to the queue. Otherwise, processing continues.

Data Flow Control

To support the STREAMS flow control mechanism, modules that use service procedures must invoke canputnext(9F) before calling putnext(9F), and use appropriate values for the high-water and low-water marks. If your module has a service procedure, you manage the flow control. If you don't have a service procedure, then there is no need to do anything.

The queue hiwat and lowat values limit the amount of data that can be placed on a queue. These limits prevent depletion of buffers in the buffer pool. Flow control is advisory in nature and can be bypassed. It is managed by high-water and low-water marks and regulated by the utility routines getq(9F), putq(9F), putbq(9F), insq(9F), rmvq(9F), and canputnext(9F).

The following scenario takes place normally in flow control:

A driver sends data to a module using putnext(9F), and the module's put procedure queues data using putq(9F). Calling putq(9F) enables the service procedure and executes it at some indeterminate time in the future. When the service procedure runs, it retrieves the data by calling getq(9F).

If the module cannot process data at the rate at which the driver is sending the data, the following happens:

When the message is queued, putq(9F) increments the value of q_count by the size of the message and compares the result to the module's high-water limit (q_hiwat) value for the queue. If the count reaches q_hiwat, putq(9F) sets the internal FULL indicator for the queue. This causes messages from upstream in the case of a write-side queue or downstream in the case of a read-side queue to be halted (canputnext(9F) returns FALSE) until the queue count drops below q_lowat. getq(9F) decrements the queue count. If the resulting count is below q_lowat, getq(9F) back-enables and causes the service procedure to be called for any blocked queue. (Flow control does not prevent reaching q_hiwat on a queue. Flow control can exceed its maximum value before canputnext(9F) detects QFULL and flow is stopped.)

The next example show a line discipline module's flow control. Example 10-6 shows a read-side line discipline module and a write-side line discipline module. Note that the read side is the same as the write side but without the M_IOCTL processing.


Example 10-6 Read-side Line Discipline Module

/* read side line discipline module flow control */
static mblk_t *read_canon(mblk_t *);

static int
ld_read_srv(
	queue_t *q)							/* pointer to read queue */
{
	mblk_t *mp;							/* original message */
	mblk_t *bp;							/* canonicalized message */

	while ((mp = getq(q)) != NULL) {
			switch (mp->b_datap->db_type) { /* type of msg */
			case M_DATA:	 /* data message */
				if (canputnext(q)) {
						bp = read_canon(mp);
						putnext(q, bp);
				} else {
						putbq(q, mp); /* put messagebackinqueue */
						return (0);
				}
				break;

			default:
				if (mp->b_datap->db_type >= QPCTL)
						putnext(q, mp); 		/* high-priority message */
				else { /* ordinary message */
						if (canputnext(q))
								 putnext(q, mp);
						else {
								 putbq(q, mp);
								 return (0);
						}
				}
				break;
			}
	}
return (0);
}

/* write side line discipline module flow control */
static int
ld_write_srv(
	queue_t *q)								/* pointer to write queue */
{
	mblk_t *mp;								/* original message */
	mblk_t *bp;								/* canonicalized message */

	while ((mp = getq(q)) != NULL) {
			switch (mp->b_datap->db_type) { /* type of msg */
			case M_DATA:			 /* data message */
				if (canputnext(q)) {
						bp = write_canon(mp);
						putnext(q, bp);
				} else {
						putbq(q, mp);
						return (0);
				}
				break;

			case M_IOCTL:
				ld_ioctl(q, mp);
				break;

			default:
				if (mp->b_datap->db_type >= QPCTL)
						putnext(q, mp);		/* high priority message */
				else { 						/* ordinary message */
						if (canputnext(q))		
								putnext(q, mp);
						else {
								putbq(q, mp);
								return (0);
						}
				}
				break;
			}
	}
return (0);
}

 
 
 
  Previous   Contents   Next