|
This srv(9E) (service) routine adds a header to all M_DATA messages before passing them along. dupb is used instead of copyb(9F) because the contents of the header block are not changed.
For each message on the queue, if it is a priority message, pass it along immediately (lines 10-11). Otherwise, if it is anything other than an M_DATA
message (line 12), and if it can be sent along (line 13), then do so (line 14). Otherwise, put the message back on the queue and return (lines 16-17). For all M_DATA messages, first check to see if the stream is flow-controlled (line 20). If it is, put the message back on the queue and return (lines 37-38). If it is not, the header block is duplicated
(line 21).
dupb() can fail either due to lack of resources or because the message block has already been duplicated 255 times. In order to handle the latter case, the example calls copyb(9F) (line 22). If copyb(9F) fails, it is due to buffer allocation failure. In this case, qbufcall(9F) is used to initiate a callback (lines 30-31) if one is not already pending (lines 26-27).
The callback function, xxxcallback(), clears the recorded qbufcall(9F)
callback id and schedules the service procedure (lines 49-50). Note that the close routine, xxxclose(), must cancel any outstanding qbufcall(9F) callback requests (lines 58-59).
If dupb() or copyb(9F) succeed, link the M_DATA message to the new message block (line 34) and pass it along (line 35).
|
1 xxxsrv(q)
2 queue_t *q;
3 {
4 struct xx *xx = (struct xx *)q->q_ptr;
5 mblk_t *mp;
6 mblk_t *bp;
7 extern mblk_t *hdr;
8
9 while ((mp = getq(q)) != NULL) {
10 if (mp->b_datap->db_type >= QPCTL) {
11 putnext(q, mp);
12 } else if (mp->b_datap->db_type != M_DATA) {
13 if (canputnext(q))
14 putnext(q, mp);
15 else {
16 putbq(q, mp);
17 return;
18 }
19 } else { /* M_DATA */
20 if (canputnext(q)) {
21 if ((bp = dupb(hdr)) == NULL)
22 bp = copyb(hdr);
23 if (bp == NULL) {
24 size_t size = msgdsize(mp);
25 putbq(q, mp);
26 if (xx->xx_qbufcall_id) {
27 /* qbufcall pending */
28 return;
29 }
30 xx->xx_qbufcall_id = qbufcall(q, size,
31 BPRI_MED, xxxcallback, (intptr_t)q);
32 return;
33 }
34 linkb(bp, mp);
35 putnext(q, bp);
36 } else {
37 putbq(q, mp);
38 return;
39 }
40 }
41 }
42 }
43 void
44 xxxcallback(q)
45 queue_t *q;
46 {
47 struct xx *xx = (struct xx *)q->q_ptr;
48
49 xx->xx_qbufcall_id = 0;
50 qenable(q);
51 }
52 xxxclose(q, cflag, crp)
53 queue_t *q;
54 int cflag;
55 cred_t *crp;
56 {
57 struct xx *xx = (struct xx *)q->q_ptr;
...
58 if (xx->xx_qbufcall_id)
59 qunbufcall(q, xx->xx_qbufcall_id);
...
60 }
|
|