data _null_;
length rc 8;
length msg $ 200;
length Qid hQueue transobj 8;
length msgid $ 40;
length hData hMap 8;
length parm1 parm2 parm3 8;
length parm4 $ 50;
hQueue=0;
hMap=0;
hData=0;
put '-------- Obtain formatname from pathname ------';
Qid=0;
rc=0;
call msmqpathtoformat("pcpad\testq", Qid, rc);
if rc ^= 0 then do;
if rc = input('03000EC0'x, ib4.) then do;
/* C00E0003 - MSMQ QUEUE_NOT_FOUND error */
/* so create it... */
put 'Queue does not exist so creating it...';
call msmqcreatequeue(Qid, rc, "PATHNAME,LABEL",
"pcpad\testq", "Test Queue");
if rc ^= 0 then do;
put 'MSMQCreateQueue: failed';
msg = sysmsg();
put msg;
goto exit;
end;
else put 'MSMQCreateQueue: succeeded';
end;
else do;
put 'MSMQPathToFormat: failed';
msg = sysmsg();
put msg;
goto exit;
end;
end;
else put 'MSMQPathToFormat: succeeded';
put '----------- Open queue for sending -----------';
call msmqopenqueue(Qid, "SEND", "SHARE", hQueue, rc);
if rc ^= 0 then do;
put 'MSMQOpenQueue: failed';
msg = sysmsg();
put msg;
goto exit;
end;
else put 'MSMQOpenQueue: succeeded';
put '----------- Generate map descriptor -----------';
/* data will not be aligned */
desc1="SHORT";
desc2="LONG";
desc3="DOUBLE";
desc4="CHAR,,50"; /* blank pad to 50 bytes */
call msmqmap(hMap, rc, desc1, desc2, desc3, desc4);
if rc ^= 0 then do;
put 'MSMQMap: failed';
msg = sysmsg();
put msg;
goto exit;
end;
else put 'MSMQMap: succeeded';
put '--- Generate data descriptor - actual data ----';
parm1=100;
parm2=9999;
parm3=9999.9999;
parm4="This is a test.";
call msmqsetparms(hData, hMap, rc, parm1,
parm2, parm3, parm4);
if rc ^= 0 then do;
put 'MSMQSetParms: failed';
msg = sysmsg();
put msg;
goto exit;
end;
else put 'MSMQSetParms: succeeded';
put '------------ Send message to queue ------------';
transobj=0;
msgid="";
call msmqsendmsg(hQueue, hData, transobj, rc,
"BODY_TYPE,CORRELATIONID,LABEL,MSGID,
PRIV_LEVEL,RESP_QUEUE",
999, "0102030405060708090A0B0C0D0E0F1011121314",
"Secret test message", msgid,
"PRIVATE", "pcpad\respq");
if rc ^= 0 then do;
put 'MSMQSendMsg: failed';
msg = sysmsg();
put msg;
end;
else do;
put 'MSMQSendMsg: succeeded';
/* display MSMQ-generated MSGID */
put 'msgid is ' msgid;
end;
exit:
if hQueue ^= 0 then do;
put '----------------- Close queue ---------------';
call msmqclosequeue(hQueue, rc);
if rc ^= 0 then do;
put 'MSMQCloseQueue: failed';
msg = sysmsg();
put msg;
end;
else put 'MSMQCloseQueue: succeeded';
end;
if Qid ^= 0 then do;
call msmqfree(Qid);
put 'Qid handle freed';
end;
if hMap ^= 0 then do;
call msmqfree(hMap);
put 'Map descriptor handle freed';
end;
if hData ^= 0 then do;
call msmqfree(hData);
put 'Data descriptor handle freed';
end;
run; data
_null_;
length rc 8;
length msg $ 200;
length Qid hQueue transobj 8;
length hMap 8;
length arrivet auth size sentt 8;
length correlid msgid $ 40;
length label $ 80;
length parm1 parm2 parm3 8;
length parm4 $ 50;
length hRespQ 8;
length respq $ 80;
length respQid 8;
hQueue=0;
hMap=0;
hRespQ=0;
respQid=0;
put '-------- Obtain formatname from pathname ------';
Qid=0;
rc=0;
call msmqpathtoformat("pcpad\testq", Qid, rc);
if rc ^= 0 then do;
put 'MSMQPathToFormat: failed';
msg = sysmsg();
put msg;
goto exit;
end;
else put 'MSMQPathToFormat: succeeded';
put '----------- Open queue for receiving ---------';
call msmqopenqueue(Qid, "RECEIVE", "SHARE", hQueue, rc);
if rc ^= 0 then do;
put 'MSMQOpenQueue: failed';
msg = sysmsg();
put msg;
goto exit;
end;
else put 'MSMQOpenQueue: succeeded';
put '----------Receive message from queue ---------';
transobj=0;
hCursor=0;
call msmqreceivemsg(hQueue, 0, "RECEIVE", hCursor,
transobj, rc, "ARRIVEDTIME,AUTHENTICATED,BODY_SIZE,
CORRELATIONID,LABEL,MSGID,RESP_QUEUE,SENTTIME",
arrivet, auth, size, correlid, label, msgid,
respq, sentt);
if rc ^= 0 then do;
put 'MSMQReceiveMsg: failed';
msg = sysmsg();
put msg;
goto exit;
end;
else do;
put 'MSMQReceiveMsg: succeeded';
/* convert MSMQ arrived time to
SAS datetime format */
arrivet =
arrivet + 10*365*24*3600 + 3*24*3600 - 5*3600;
put 'arrived time is' arrivet datetime.;
if auth = 1 then put 'message was authenticated';
else put 'message was not authenticated';
put 'message body size is ' size;
put 'correlation id is ' correlid;
put 'label is ' label;
put 'msg id is ' msgid;
put 'resp_queue Qid handle is ' respq;
/* convert MSMQ sent time to SAS datetime format */
sentt = sentt + 10*365*24*3600 + 3*24*3600 - 5*3600;
put 'sent time was' sentt datetime.;
end;
if size ^= 0 then do;
put '---------- Generate map descriptor ----------';
desc1="SHORT";
desc2="LONG";
desc3="DOUBLE";
desc4="CHAR,,50";
call msmqmap(hMap, rc, desc1, desc2, desc3, desc4);
if rc ^= 0 then do;
put 'MSMQMap: failed';
msg = sysmsg();
put msg;
goto exit;
end;
else put 'MSMQMap: succeeded';
call msmqgetparms(hMap, rc, parm1,
parm2, parm3, parm4);
if rc ^= 0 then do;
put 'MSMQGetParms: failed';
msg = sysmsg();
put msg;
goto exit;
end;
else do;
put 'MSMQGetParms: succeeded';
put 'parm1 = ' parm1;
put 'parm2 = ' parm2;
put 'parm3 = ' parm3;
put 'parm4 = ' parm4;
end;
end;
else put 'No data was associated with the message';
/* post a reply to the response queue if available */
if respq ^= "" then do;
call msmqpathtoformat(respq, respQid, rc);
if rc ^= 0 then do;
put 'MSMQPathToFormat: failed to
open response queue';
msg = sysmsg();
goto exit;
end;
call msmqopenqueue(respQid, "SEND",
"SHARE", hRespQ, rc);
if rc ^= 0 then do;
put 'MSMQOpenQueue: failed to
open response queue';
msg = sysmsg();
put msg;
goto exit;
end;
hMap=0;
call msmqsetparms(hData, hMap, rc,
"Message received OK");
if rc ^= 0 then do;
put 'MSMQSetParms: failed to
send response message';
msg = sysmsg();
put msg;
goto exit;
end;
transobj=0;
call msmqsendmsg(hRespQ, hData, transobj, rc);
if rc ^= 0 then do;
put 'MSMQSendMsg: failed to
send response message';
msg = sysmsg();
put msg;
end;
else put 'reply sent to the response queue';
end;
exit:
if hQueue ^= 0 then do;
put '----------------- Close queue ---------------';
call msmqclosequeue(hQueue, rc);
if rc ^= 0 then do;
put 'MSMQCloseQueue: failed';
msg = sysmsg();
put msg;
end;
else put 'MSMQCloseQueue: succeeded';
end;
if hRespQ ^= 0 then do;
put '------------ Close Response Queue -----------';
call msmqclosequeue(hRespQ, rc);
if rc ^= 0 then do;
put 'MSMQCloseQueue: failed to
close response queue';
msg = sysmsg();
put msg;
end;
else put 'MSMQCloseQueue: succeeded
to close response queue';
end;
if Qid ^= 0 then do;
call msmqfree(Qid);
put 'Qid handle freed';
end;
if respQid ^= 0 then do;
call msmqfree(respQid);
put 'respQid handle freed';
end;
if hMap ^= 0 then do;
call msmqfree(hMap);
put 'Map descriptor handle freed';
end;
run;
data _null_;
length rc 8;
length msg $ 200;
length Qid hQueue hmap 8;
length appspec 8;
length corrid $ 40;
length record $ 256;
length seqno 8 seqstr $ 4;
/* send this file to the queue */
infile 'd:\test.txt' length=reclen end=eof;
put '--------- Obtain Formatname from Pathname -------';
call msmqpathtoformat(".\testq", Qid, rc);
if( rc ) then do;
if( rc = input('03000EC0'x,ib4.) ) then do;
put 'Queue does not exist so create it...';
call msmqcreatequeue(Qid, rc, "pathname,label",
".\testq", "test queue");
if( rc ) then do;
put 'MSMQCreateQueue: failed';
msg = sysmsg();
put msg;
goto exit;
end;
end;
else do;
put 'MSMQPathToFormat: failed';
msg = sysmsg();
put msg;
goto exit;
end;
end;
put '------------------ Open Queue ------------------';
call msmqopenqueue(Qid, "SEND", "SHARE", hQueue, rc);
if( rc ) then do;
put 'MSMQOpenQueue: failed';
msg = sysmsg();
put msg;
goto exit;
end;
put '----------- Generate map descriptor -----------';
/* longest record in file is 255 bytes+1 length byte... */
/* therefore all messages on the queue pertaining to */
/* this file will be blank-padded for 256 bytes... */
call msmqmap(hmap, rc, "char,,256");
if rc ^= 0 then do;
put 'MSMQMap: failed';
msg = sysmsg();
put msg;
goto exit;
end;
/* designate that messages belong to a text file */
appspec=100000;
/* all of these messages will have
the same correlationid+seqno */
corrid="46696c65212121"; /* File!!! */
seqno = 0;
do until(eof);
input @;
input record $varying256. reclen;
call msmqsetparms(hdata, hmap, rc, record);
if( rc ) then do;
put 'MSMQSetParms: failed';
msg = sysmsg();
put msg;
goto exit;
end;
/* add sequence # to correlationid */
seqstr = put(seqno, hex4.);
substr(corrid,15,4) = seqstr;
seqno = seqno+1;
put '--- Send message to queue ----';
call msmqsendmsg(hQueue, hdata, 0, rc,
"appspecific,correlationid", appspec, corrid);
if( rc ) then do;
put 'MSMQSendMsg: failed';
msg = sysmsg();
put msg;
goto exit;
end;
/* free data */
call msmqfree(hdata);
end;
exit:
if( hQueue ) then do;
call msmqclosequeue(hQueue, rc);
if( rc ) then do;
put 'MSMQCloseQueue: failed';
msg = sysmsg();
put msg;
end;
end;
if( Qid ) then
call msmqfree(Qid);
if( hmap ) then
call msmqfree(hmap);
stop;
run; filename output
'd:\testdup.txt';
data _null_;
length rc 8;
length msg $ 200;
length Qid hQueue hmap hCursor hCursor2 8;
length corrid corrid2 filecorrid $ 40;
length appspec 8;
length action action2 $ 12;
length record $ 256;
length seqno 8;
fileid = fopen('output', 'o', 256, 'v');
if( fileid = 0 ) then do;
put 'Error opening output file...';
goto exit;
end;
put '--------- Obtain Formatname from Pathname -------';
call msmqpathtoformat(".\testq", Qid, rc);
if( rc ) then do;
put 'MSMQPathToFormat: failed';
msg = sysmsg();
put msg;
goto exit;
end;
put '------------------ Open Queue ------------------';
call msmqopenqueue(Qid, "RECEIVE", "SHARE", hQueue, rc);
if( rc ) then do;
put 'MSMQOpenQueue: failed';
msg = sysmsg();
put msg;
goto exit;
end;
call msmqcreatecursor(hQueue, hCursor, rc);
if( rc ) then do;
put 'MSMQCreateCursor failed';
msg = sysmsg();
put msg;
end;
/* peek first to see if belongs to the file you want */
action="PEEK_CURRENT";
seqno=0;
recv:
call msmqreceivemsg(hQueue, 0, action, hCursor, 0, rc,
"APPSPECIFIC,CORRELATIONID", appspec, corrid);
if( rc ) then do;
if( rc = input('1B000EC0'x,ib4.) ) then do;
put 'reached end of queue';
goto exit;
end;
put 'MSMQReceiveMsg: failed';
msg = sysmsg();
put msg;
goto exit;
end;
/* default action */
action="PEEK_NEXT";
if( appspec = 100000 ) then do;
/* file processing... */
outofseq=0;
if( filecorrid = "" ) then do;
/* file begins at this message */
/* write all correlating messages to this file */
filecorrid = substr(corrid,1,14);
put '--------- Generate map descriptor ---------';
/* all file messages were sent to the queue as
256 bytes blank-padded */
call msmqmap(hmap, rc, "char,,256");
if( rc ) then do;
put 'MSMQMap: failed';
msg = sysmsg();
put msg;
goto exit;
end;
end;
/* make sure message belongs to this file */
if( substr(corrid,1,14) = filecorrid ) then do;
if( seqno ^= input(substr(corrid,15,4),
hex4.) ) then do;
/* this message is out of sequence
so search for it */
outofseq=1;
call msmqcreatecursor(hQueue, hCursor2, rc);
if( rc ) then do;
put 'MSMQCreateCursor failed';
msg = sysmsg();
put msg;
end;
action2="PEEK_CURRENT";
peeknxt:
call msmqreceivemsg(hQueue, 0, action2,
hCursor2, 0, rc, "CORRELATIONID", corrid2);
if( rc ) then do;
if( rc = input('1B000EC0'x,ib4.) ) then do;
put 'Error: reached end of queue while
searching for out-of-sequence msg';
goto exit;
end;
put 'MSMQReceiveMsg: failed';
msg = sysmsg();
put msg;
goto exit;
end;
if( seqno ^= input(substr(corrid2,15,4),
hex4.) ) then do;
action2="PEEK_NEXT";
goto peeknxt;
end;
end;
/* increment sequence number for next
expected message */
seqno=seqno+1;
/* retrieve record from internal buffer */
call msmqgetparms(hmap, rc, record);
if( rc ) then do;
put 'MSMQGetParms: failed';
msg = sysmsg();
put msg;
goto exit;
end;
put 'write record to file';
rc = fput(fileid, record);
if( rc ) then do;
put 'Error writing to output file buffer...';
goto exit;
end;
/* flush it to disk */
rc = fwrite(fileid);
if( rc ) then do;
put 'Error writing to output file...';
goto exit;
end;
/* now remove it from the queue...
don't care about receiving body */
body=0;
if( outofseq ) then do;
call msmqreceivemsg(hQueue, 0, "RECEIVE",
hCursor2, 0, rc, "body", body);
/* close this cursor */
call msmqclosecursor(hCursor2, rc);
end;
else do;
call msmqreceivemsg(hQueue, 0, "RECEIVE",
hCursor, 0, rc, "body", body);
end;
/* we are now pointing at the next message */
action="PEEK_CURRENT";
end;
end;
/* finish retrieving all messages belonging
to this file */
goto recv;
exit:
if( hQueue ) then do;
call msmqclosequeue(hQueue, rc);
if( rc ) then do;
put 'MSMQCloseQueue: failed';
msg = sysmsg();
put msg;
end;
end;
if( Qid ) then
call msmqfree(Qid);
if( hmap ) then
call msmqfree(hmap);
/* close file */
rc = fclose(fileid);
if( rc ) then put 'Error closing output file';
run; data
_null_;
length rc 8;
length msg $ 200;
length Qid hQueue hmap 8;
length appspec 8;
length corrid $ 40;
length msgbuf $ 256;
length seqno 8 seqstr $ 4;
/* read in as a stream of bytes */
infile 'd:\test.exe' recfm=f lrecl=1 end=eof;
put '--------- Obtain Formatname from Pathname -------';
call msmqpathtoformat(".\testq", Qid, rc);
if( rc ) then do;
if( rc = input('03000EC0'x,ib4.) ) then do;
put 'Queue does not exist so create it';
call msmqcreatequeue(Qid, rc, "pathname,label",
".\testq", "test queue:");
if( rc ) then do;
put 'MSMQCreateQueue: failed';
msg = sysmsg();
put msg;
goto exit;
end;
end;
else do;
put 'MSMQPathToFormat: failed';
msg = sysmsg();
put msg;
goto exit;
end;
end;
put '------------------ Open Queue ------------------';
call msmqopenqueue(Qid, "SEND", "SHARE", hQueue, rc);
if( rc ) then do;
put 'MSMQOpenQueue: failed';
msg = sysmsg();
put msg;
goto exit;
end;
put '----------- Generate map descriptor -----------';
/* send 256 byte messages to the queue */
call msmqmap(hmap, rc, "char,,256");
if( rc ) then do;
put 'MSMQMap: failed';
msg = sysmsg();
put msg;
goto exit;
end;
/* designate messages belong to a binary file */
appspec=100001;
/* all of these messages will have the
same correlationid */
corrid="42696e46696c65212121"; /* BinFile!!! */
seqno = 0;
i=1;
do until(eof);
/* read a byte at a time */
input x $char1.;
i+1;
substr(msgbuf,i,1) = x;
if i = 256 or eof then do;
/* set length of this record embedded
as first byte of message */
substr(msgbuf,1,1) = put(i-1,pib1.);
call msmqsetparms(hdata, hmap, rc, msgbuf);
if( rc ) then do;
put 'MSMQSetParms: failed';
msg = sysmsg();
put msg;
goto exit;
end;
/* add sequence # to correlationid */
seqstr = put(seqno, hex4.);
substr(corrid,21,4) = seqstr;
seqno = seqno + 1;
put '--- Send message to queue ----';
call msmqsendmsg(hQueue, hdata, 0, rc,
"appspecific,correlationid,acknowledge,
admin_queue", appspec, corrid,
"nack_reach_queue", ".\adminq");
if( rc ) then do;
put 'MSMQSendMsg: failed';
msg = sysmsg();
put msg;
goto exit;
end;
/* free data */
call msmqfree(hdata);
/* reset message buffer entities */
i=1;
msgbuf="";
end;
end;
exit:
if( hQueue ) then do;
call msmqclosequeue(hQueue, rc);
if( rc ) then do;
put 'MSMQCloseQueue: failed';
msg = sysmsg();
put msg;
end;
end;
if( Qid ) then
call msmqfree(Qid);
if( hmap ) then
call msmqfree(hmap);
stop;
run; filename output 'd:\testdup.exe';
data _null_;
length rc 8;
length msg $ 200;
length Qid hQueue hmap hCursor hCursor2 8;
length corrid corrid2 filecorrid $ 40;
length appspec 8;
length action action2 $ 12;
length msgbuf stream $ 256;
length len 8;
length seqno 8;
fileid = fopen('output', 'o', 0, 'b');
if( fileid = 0 ) then do;
put 'Error opening output file...';
goto exit;
end;
put '--------- Obtain Formatname from Pathname -------';
call msmqpathtoformat(".\testq", Qid, rc);
if( rc ) then do;
put 'MSMQPathToFormat: failed';
msg = sysmsg();
put msg;
goto exit;
end;
put '------------------ Open Queue ------------------';
call msmqopenqueue(Qid, "RECEIVE", "SHARE", hQueue, rc);
if( rc ) then do;
put 'MSMQOpenQueue: failed';
msg = sysmsg();
put msg;
goto exit;
end;
call msmqcreatecursor(hQueue, hCursor, rc);
if( rc ) then do;
put 'MSMQCreateCursor failed';
msg = sysmsg();
put msg;
end;
/* peek first to see if belongs to the file you want */
action="PEEK_CURRENT";
seqno=0;
recv:
call msmqreceivemsg(hQueue, 0, action, hCursor, 0, rc,
"APPSPECIFIC,CORRELATIONID", appspec, corrid);
if( rc ) then do;
if( rc = input('1B000EC0'x,ib4.) ) then do;
put 'reached end of queue';
goto exit;
end;
put 'MSMQReceiveMsg: failed';
msg = sysmsg();
put msg;
goto exit;
end;
/* default action */
action="PEEK_NEXT";
if( appspec = 100001 ) then do;
/* file processing */
outofseq=0;
if( filecorrid = "" ) then do;
/* file begins at this message */
/* write all correlating messages to this file */
filecorrid = substr(corrid,1,20);
put '--------- Generate map descriptor ---------';
/* all file messages were sent to the queue as
256 bytes blank-padded */
call msmqmap(hmap, rc, "char,,256");
if( rc ) then do;
put 'MSMQMap: failed';
msg = sysmsg();
put msg;
goto exit;
end;
end;
/* make sure message belongs to this file */
if( substr(corrid,1,20) = filecorrid ) then do;
if( seqno ^= input(substr(corrid,21,4), hex4.) )
then do;
/* this message is out of sequence
so search for it */
outofseq=1;
call msmqcreatecursor(hQueue, hCursor2, rc);
if( rc ) then do;
put 'MSMQCreateCursor failed';
msg = sysmsg();
put msg;
goto exit;
end;
action2="PEEK_CURRENT";
peeknxt:
call msmqreceivemsg(hQueue, 0, action2,
hCursor2, 0, rc, "CORRELATIONID", corrid2);
if( rc ) then do;
if( rc = input('1B000EC0'x, ib4.) ) then do;
put 'Error: reached end of queue while
searching for out-of-sequence msg';
goto exit;
end;
put 'MSMQReceiveMsg: failed';
msg = sysmsg();
put msg;
goto exit;
end;
if( seqno ^= input(substr(corrid2,21,4), hex4.) )
then do;
action2="PEEK_NEXT";
goto peeknxt;
end;
end;
/* increment sequence number for
next expected message */
seqno=seqno+1;
/* retrieve record from internal buffer */
call msmqgetparms(hmap, rc, msgbuf);
if( rc ) then do;
put 'MSMQGetParms: failed';
msg = sysmsg();
put msg;
goto exit;
end;
/* length of this stream is embedded
as 1st byte in msg */
len = input(substr(msgbuf,1,1), pib1.);
stream = substr(msgbuf,2);
put 'write stream to file';
rc = fput(fileid, substr(stream,1,len));
if( rc ) then do;
put 'Error writing to output file buffer...';
goto exit;
end;
/* flush it to disk */
rc = fwrite(fileid);
if( rc ) then do;
put 'Error writing to output file...';
goto exit;
end;
/* now remove it from the queue...
don't care about receiving body */
body=0;
if( outofseq ) then do;
call msmqreceivemsg(hQueue, 0, "RECEIVE",
hCursor2, 0, rc, "body", body);
/* close this cursor */
call msmqclosecursor(hCursor2, rc);
end;
else do;
call msmqreceivemsg(hQueue, 0, "RECEIVE",
hCursor, 0, rc, "body", body);
end;
/* we are now pointing at the next message */
action="PEEK_CURRENT";
end;
end;
/* finish retrieving all messages belonging
to this file */
goto recv;
exit:
if( hQueue ) then do;
call msmqclosequeue(hQueue, rc);
if( rc ) then do;
put 'MSMQCloseQueue: failed';
msg = sysmsg();
put msg;
end;
end;
if( Qid ) then
call msmqfree(Qid);
if( hmap ) then
call msmqfree(hmap);
/* close file */
rc = fclose(fileid);
if( rc ) then put 'Error closing output file';
run;