data _null_; length rc 8; length msg $ 200; length Qid hQueue transobj 8; length msgid $ 40; length hData hMap 8; length parm1 parm2 parm3 8; length parm4 $ 50; hQueue=0; hMap=0; hData=0; put '-------- Obtain formatname from pathname ------'; Qid=0; rc=0; call msmqpathtoformat("pcpad\testq", Qid, rc); if rc ^= 0 then do; if rc = input('03000EC0'x, ib4.) then do; /* C00E0003 - MSMQ QUEUE_NOT_FOUND error */ /* so create it... */ put 'Queue does not exist so creating it...'; call msmqcreatequeue(Qid, rc, "PATHNAME,LABEL", "pcpad\testq", "Test Queue"); if rc ^= 0 then do; put 'MSMQCreateQueue: failed'; msg = sysmsg(); put msg; goto exit; end; else put 'MSMQCreateQueue: succeeded'; end; else do; put 'MSMQPathToFormat: failed'; msg = sysmsg(); put msg; goto exit; end; end; else put 'MSMQPathToFormat: succeeded'; put '----------- Open queue for sending -----------'; call msmqopenqueue(Qid, "SEND", "SHARE", hQueue, rc); if rc ^= 0 then do; put 'MSMQOpenQueue: failed'; msg = sysmsg(); put msg; goto exit; end; else put 'MSMQOpenQueue: succeeded'; put '----------- Generate map descriptor -----------'; /* data will not be aligned */ desc1="SHORT"; desc2="LONG"; desc3="DOUBLE"; desc4="CHAR,,50"; /* blank pad to 50 bytes */ call msmqmap(hMap, rc, desc1, desc2, desc3, desc4); if rc ^= 0 then do; put 'MSMQMap: failed'; msg = sysmsg(); put msg; goto exit; end; else put 'MSMQMap: succeeded';
put '--- Generate data descriptor - actual data ----'; parm1=100; parm2=9999; parm3=9999.9999; parm4="This is a test."; call msmqsetparms(hData, hMap, rc, parm1, parm2, parm3, parm4); if rc ^= 0 then do; put 'MSMQSetParms: failed'; msg = sysmsg(); put msg; goto exit; end; else put 'MSMQSetParms: succeeded'; put '------------ Send message to queue ------------'; transobj=0; msgid=""; call msmqsendmsg(hQueue, hData, transobj, rc, "BODY_TYPE,CORRELATIONID,LABEL,MSGID, PRIV_LEVEL,RESP_QUEUE", 999, "0102030405060708090A0B0C0D0E0F1011121314", "Secret test message", msgid, "PRIVATE", "pcpad\respq"); if rc ^= 0 then do; put 'MSMQSendMsg: failed'; msg = sysmsg(); put msg; end; else do; put 'MSMQSendMsg: succeeded'; /* display MSMQ-generated MSGID */ put 'msgid is ' msgid; end; exit: if hQueue ^= 0 then do; put '----------------- Close queue ---------------'; call msmqclosequeue(hQueue, rc); if rc ^= 0 then do; put 'MSMQCloseQueue: failed'; msg = sysmsg(); put msg; end; else put 'MSMQCloseQueue: succeeded'; end; if Qid ^= 0 then do; call msmqfree(Qid); put 'Qid handle freed'; end; if hMap ^= 0 then do; call msmqfree(hMap); put 'Map descriptor handle freed'; end; if hData ^= 0 then do; call msmqfree(hData); put 'Data descriptor handle freed'; end; run;
data _null_; length rc 8; length msg $ 200; length Qid hQueue transobj 8; length hMap 8; length arrivet auth size sentt 8; length correlid msgid $ 40; length label $ 80; length parm1 parm2 parm3 8; length parm4 $ 50; length hRespQ 8; length respq $ 80; length respQid 8; hQueue=0; hMap=0; hRespQ=0; respQid=0; put '-------- Obtain formatname from pathname ------'; Qid=0; rc=0; call msmqpathtoformat("pcpad\testq", Qid, rc); if rc ^= 0 then do; put 'MSMQPathToFormat: failed'; msg = sysmsg(); put msg; goto exit; end; else put 'MSMQPathToFormat: succeeded'; put '----------- Open queue for receiving ---------'; call msmqopenqueue(Qid, "RECEIVE", "SHARE", hQueue, rc); if rc ^= 0 then do; put 'MSMQOpenQueue: failed'; msg = sysmsg(); put msg; goto exit; end; else put 'MSMQOpenQueue: succeeded'; put '----------Receive message from queue ---------'; transobj=0; hCursor=0; call msmqreceivemsg(hQueue, 0, "RECEIVE", hCursor, transobj, rc, "ARRIVEDTIME,AUTHENTICATED,BODY_SIZE, CORRELATIONID,LABEL,MSGID,RESP_QUEUE,SENTTIME", arrivet, auth, size, correlid, label, msgid, respq, sentt); if rc ^= 0 then do; put 'MSMQReceiveMsg: failed'; msg = sysmsg(); put msg; goto exit; end; else do; put 'MSMQReceiveMsg: succeeded'; /* convert MSMQ arrived time to SAS datetime format */ arrivet = arrivet + 10*365*24*3600 + 3*24*3600 - 5*3600; put 'arrived time is' arrivet datetime.; if auth = 1 then put 'message was authenticated'; else put 'message was not authenticated'; put 'message body size is ' size; put 'correlation id is ' correlid; put 'label is ' label; put 'msg id is ' msgid; put 'resp_queue Qid handle is ' respq; /* convert MSMQ sent time to SAS datetime format */ sentt = sentt + 10*365*24*3600 + 3*24*3600 - 5*3600; put 'sent time was' sentt datetime.; end;
if size ^= 0 then do; put '---------- Generate map descriptor ----------'; desc1="SHORT"; desc2="LONG"; desc3="DOUBLE"; desc4="CHAR,,50"; call msmqmap(hMap, rc, desc1, desc2, desc3, desc4); if rc ^= 0 then do; put 'MSMQMap: failed'; msg = sysmsg(); put msg; goto exit; end; else put 'MSMQMap: succeeded'; call msmqgetparms(hMap, rc, parm1, parm2, parm3, parm4); if rc ^= 0 then do; put 'MSMQGetParms: failed'; msg = sysmsg(); put msg; goto exit; end; else do; put 'MSMQGetParms: succeeded'; put 'parm1 = ' parm1; put 'parm2 = ' parm2; put 'parm3 = ' parm3; put 'parm4 = ' parm4; end; end; else put 'No data was associated with the message'; /* post a reply to the response queue if available */ if respq ^= "" then do; call msmqpathtoformat(respq, respQid, rc); if rc ^= 0 then do; put 'MSMQPathToFormat: failed to open response queue'; msg = sysmsg(); goto exit; end; call msmqopenqueue(respQid, "SEND", "SHARE", hRespQ, rc); if rc ^= 0 then do; put 'MSMQOpenQueue: failed to open response queue'; msg = sysmsg(); put msg; goto exit; end; hMap=0; call msmqsetparms(hData, hMap, rc, "Message received OK"); if rc ^= 0 then do; put 'MSMQSetParms: failed to send response message'; msg = sysmsg(); put msg; goto exit; end; transobj=0; call msmqsendmsg(hRespQ, hData, transobj, rc); if rc ^= 0 then do; put 'MSMQSendMsg: failed to send response message'; msg = sysmsg(); put msg; end; else put 'reply sent to the response queue'; end;
exit: if hQueue ^= 0 then do; put '----------------- Close queue ---------------'; call msmqclosequeue(hQueue, rc); if rc ^= 0 then do; put 'MSMQCloseQueue: failed'; msg = sysmsg(); put msg; end; else put 'MSMQCloseQueue: succeeded'; end; if hRespQ ^= 0 then do; put '------------ Close Response Queue -----------'; call msmqclosequeue(hRespQ, rc); if rc ^= 0 then do; put 'MSMQCloseQueue: failed to close response queue'; msg = sysmsg(); put msg; end; else put 'MSMQCloseQueue: succeeded to close response queue'; end; if Qid ^= 0 then do; call msmqfree(Qid); put 'Qid handle freed'; end; if respQid ^= 0 then do; call msmqfree(respQid); put 'respQid handle freed'; end; if hMap ^= 0 then do; call msmqfree(hMap); put 'Map descriptor handle freed'; end; run;
data _null_; length rc 8; length msg $ 200; length Qid hQueue hmap 8; length appspec 8; length corrid $ 40; length record $ 256; length seqno 8 seqstr $ 4; /* send this file to the queue */ infile 'd:\test.txt' length=reclen end=eof; put '--------- Obtain Formatname from Pathname -------'; call msmqpathtoformat(".\testq", Qid, rc); if( rc ) then do; if( rc = input('03000EC0'x,ib4.) ) then do; put 'Queue does not exist so create it...'; call msmqcreatequeue(Qid, rc, "pathname,label", ".\testq", "test queue"); if( rc ) then do; put 'MSMQCreateQueue: failed'; msg = sysmsg(); put msg; goto exit; end; end; else do; put 'MSMQPathToFormat: failed'; msg = sysmsg(); put msg; goto exit; end; end; put '------------------ Open Queue ------------------'; call msmqopenqueue(Qid, "SEND", "SHARE", hQueue, rc); if( rc ) then do; put 'MSMQOpenQueue: failed'; msg = sysmsg(); put msg; goto exit; end; put '----------- Generate map descriptor -----------'; /* longest record in file is 255 bytes+1 length byte... */ /* therefore all messages on the queue pertaining to */ /* this file will be blank-padded for 256 bytes... */ call msmqmap(hmap, rc, "char,,256"); if rc ^= 0 then do; put 'MSMQMap: failed'; msg = sysmsg(); put msg; goto exit; end; /* designate that messages belong to a text file */ appspec=100000; /* all of these messages will have the same correlationid+seqno */ corrid="46696c65212121"; /* File!!! */ seqno = 0; do until(eof); input @; input record $varying256. reclen; call msmqsetparms(hdata, hmap, rc, record); if( rc ) then do; put 'MSMQSetParms: failed'; msg = sysmsg(); put msg; goto exit; end;
/* add sequence # to correlationid */ seqstr = put(seqno, hex4.); substr(corrid,15,4) = seqstr; seqno = seqno+1; put '--- Send message to queue ----'; call msmqsendmsg(hQueue, hdata, 0, rc, "appspecific,correlationid", appspec, corrid); if( rc ) then do; put 'MSMQSendMsg: failed'; msg = sysmsg(); put msg; goto exit; end; /* free data */ call msmqfree(hdata); end; exit: if( hQueue ) then do; call msmqclosequeue(hQueue, rc); if( rc ) then do; put 'MSMQCloseQueue: failed'; msg = sysmsg(); put msg; end; end; if( Qid ) then call msmqfree(Qid); if( hmap ) then call msmqfree(hmap); stop; run;
filename output 'd:\testdup.txt'; data _null_; length rc 8; length msg $ 200; length Qid hQueue hmap hCursor hCursor2 8; length corrid corrid2 filecorrid $ 40; length appspec 8; length action action2 $ 12; length record $ 256; length seqno 8; fileid = fopen('output', 'o', 256, 'v'); if( fileid = 0 ) then do; put 'Error opening output file...'; goto exit; end; put '--------- Obtain Formatname from Pathname -------'; call msmqpathtoformat(".\testq", Qid, rc); if( rc ) then do; put 'MSMQPathToFormat: failed'; msg = sysmsg(); put msg; goto exit; end; put '------------------ Open Queue ------------------'; call msmqopenqueue(Qid, "RECEIVE", "SHARE", hQueue, rc); if( rc ) then do; put 'MSMQOpenQueue: failed'; msg = sysmsg(); put msg; goto exit; end; call msmqcreatecursor(hQueue, hCursor, rc); if( rc ) then do; put 'MSMQCreateCursor failed'; msg = sysmsg(); put msg; end; /* peek first to see if belongs to the file you want */ action="PEEK_CURRENT"; seqno=0; recv: call msmqreceivemsg(hQueue, 0, action, hCursor, 0, rc, "APPSPECIFIC,CORRELATIONID", appspec, corrid); if( rc ) then do; if( rc = input('1B000EC0'x,ib4.) ) then do; put 'reached end of queue'; goto exit; end; put 'MSMQReceiveMsg: failed'; msg = sysmsg(); put msg; goto exit; end; /* default action */ action="PEEK_NEXT"; if( appspec = 100000 ) then do; /* file processing... */ outofseq=0; if( filecorrid = "" ) then do; /* file begins at this message */ /* write all correlating messages to this file */ filecorrid = substr(corrid,1,14); put '--------- Generate map descriptor ---------'; /* all file messages were sent to the queue as 256 bytes blank-padded */ call msmqmap(hmap, rc, "char,,256"); if( rc ) then do; put 'MSMQMap: failed'; msg = sysmsg(); put msg; goto exit; end; end;
/* make sure message belongs to this file */ if( substr(corrid,1,14) = filecorrid ) then do; if( seqno ^= input(substr(corrid,15,4), hex4.) ) then do; /* this message is out of sequence so search for it */ outofseq=1; call msmqcreatecursor(hQueue, hCursor2, rc); if( rc ) then do; put 'MSMQCreateCursor failed'; msg = sysmsg(); put msg; end; action2="PEEK_CURRENT"; peeknxt: call msmqreceivemsg(hQueue, 0, action2, hCursor2, 0, rc, "CORRELATIONID", corrid2); if( rc ) then do; if( rc = input('1B000EC0'x,ib4.) ) then do; put 'Error: reached end of queue while searching for out-of-sequence msg'; goto exit; end; put 'MSMQReceiveMsg: failed'; msg = sysmsg(); put msg; goto exit; end; if( seqno ^= input(substr(corrid2,15,4), hex4.) ) then do; action2="PEEK_NEXT"; goto peeknxt; end; end; /* increment sequence number for next expected message */ seqno=seqno+1; /* retrieve record from internal buffer */ call msmqgetparms(hmap, rc, record); if( rc ) then do; put 'MSMQGetParms: failed'; msg = sysmsg(); put msg; goto exit; end; put 'write record to file'; rc = fput(fileid, record); if( rc ) then do; put 'Error writing to output file buffer...'; goto exit; end; /* flush it to disk */ rc = fwrite(fileid); if( rc ) then do; put 'Error writing to output file...'; goto exit; end;
/* now remove it from the queue... don't care about receiving body */ body=0; if( outofseq ) then do; call msmqreceivemsg(hQueue, 0, "RECEIVE", hCursor2, 0, rc, "body", body); /* close this cursor */ call msmqclosecursor(hCursor2, rc); end; else do; call msmqreceivemsg(hQueue, 0, "RECEIVE", hCursor, 0, rc, "body", body); end; /* we are now pointing at the next message */ action="PEEK_CURRENT"; end; end; /* finish retrieving all messages belonging to this file */ goto recv; exit: if( hQueue ) then do; call msmqclosequeue(hQueue, rc); if( rc ) then do; put 'MSMQCloseQueue: failed'; msg = sysmsg(); put msg; end; end; if( Qid ) then call msmqfree(Qid); if( hmap ) then call msmqfree(hmap); /* close file */ rc = fclose(fileid); if( rc ) then put 'Error closing output file'; run;
data _null_; length rc 8; length msg $ 200; length Qid hQueue hmap 8; length appspec 8; length corrid $ 40; length msgbuf $ 256; length seqno 8 seqstr $ 4; /* read in as a stream of bytes */ infile 'd:\test.exe' recfm=f lrecl=1 end=eof; put '--------- Obtain Formatname from Pathname -------'; call msmqpathtoformat(".\testq", Qid, rc); if( rc ) then do; if( rc = input('03000EC0'x,ib4.) ) then do; put 'Queue does not exist so create it'; call msmqcreatequeue(Qid, rc, "pathname,label", ".\testq", "test queue:"); if( rc ) then do; put 'MSMQCreateQueue: failed'; msg = sysmsg(); put msg; goto exit; end; end; else do; put 'MSMQPathToFormat: failed'; msg = sysmsg(); put msg; goto exit; end; end; put '------------------ Open Queue ------------------'; call msmqopenqueue(Qid, "SEND", "SHARE", hQueue, rc); if( rc ) then do; put 'MSMQOpenQueue: failed'; msg = sysmsg(); put msg; goto exit; end; put '----------- Generate map descriptor -----------'; /* send 256 byte messages to the queue */ call msmqmap(hmap, rc, "char,,256"); if( rc ) then do; put 'MSMQMap: failed'; msg = sysmsg(); put msg; goto exit; end; /* designate messages belong to a binary file */ appspec=100001; /* all of these messages will have the same correlationid */ corrid="42696e46696c65212121"; /* BinFile!!! */ seqno = 0; i=1; do until(eof); /* read a byte at a time */ input x $char1.; i+1; substr(msgbuf,i,1) = x; if i = 256 or eof then do; /* set length of this record embedded as first byte of message */ substr(msgbuf,1,1) = put(i-1,pib1.); call msmqsetparms(hdata, hmap, rc, msgbuf); if( rc ) then do; put 'MSMQSetParms: failed'; msg = sysmsg(); put msg; goto exit; end; /* add sequence # to correlationid */ seqstr = put(seqno, hex4.); substr(corrid,21,4) = seqstr; seqno = seqno + 1; put '--- Send message to queue ----'; call msmqsendmsg(hQueue, hdata, 0, rc, "appspecific,correlationid,acknowledge, admin_queue", appspec, corrid, "nack_reach_queue", ".\adminq"); if( rc ) then do; put 'MSMQSendMsg: failed'; msg = sysmsg(); put msg; goto exit; end;
/* free data */ call msmqfree(hdata); /* reset message buffer entities */ i=1; msgbuf=""; end; end; exit: if( hQueue ) then do; call msmqclosequeue(hQueue, rc); if( rc ) then do; put 'MSMQCloseQueue: failed'; msg = sysmsg(); put msg; end; end; if( Qid ) then call msmqfree(Qid); if( hmap ) then call msmqfree(hmap); stop; run;
filename output 'd:\testdup.exe'; data _null_; length rc 8; length msg $ 200; length Qid hQueue hmap hCursor hCursor2 8; length corrid corrid2 filecorrid $ 40; length appspec 8; length action action2 $ 12; length msgbuf stream $ 256; length len 8; length seqno 8; fileid = fopen('output', 'o', 0, 'b'); if( fileid = 0 ) then do; put 'Error opening output file...'; goto exit; end; put '--------- Obtain Formatname from Pathname -------'; call msmqpathtoformat(".\testq", Qid, rc); if( rc ) then do; put 'MSMQPathToFormat: failed'; msg = sysmsg(); put msg; goto exit; end; put '------------------ Open Queue ------------------'; call msmqopenqueue(Qid, "RECEIVE", "SHARE", hQueue, rc); if( rc ) then do; put 'MSMQOpenQueue: failed'; msg = sysmsg(); put msg; goto exit; end; call msmqcreatecursor(hQueue, hCursor, rc); if( rc ) then do; put 'MSMQCreateCursor failed'; msg = sysmsg(); put msg; end; /* peek first to see if belongs to the file you want */ action="PEEK_CURRENT"; seqno=0; recv: call msmqreceivemsg(hQueue, 0, action, hCursor, 0, rc, "APPSPECIFIC,CORRELATIONID", appspec, corrid); if( rc ) then do; if( rc = input('1B000EC0'x,ib4.) ) then do; put 'reached end of queue'; goto exit; end;
put 'MSMQReceiveMsg: failed'; msg = sysmsg(); put msg; goto exit; end; /* default action */ action="PEEK_NEXT"; if( appspec = 100001 ) then do; /* file processing */ outofseq=0; if( filecorrid = "" ) then do; /* file begins at this message */ /* write all correlating messages to this file */ filecorrid = substr(corrid,1,20); put '--------- Generate map descriptor ---------'; /* all file messages were sent to the queue as 256 bytes blank-padded */ call msmqmap(hmap, rc, "char,,256"); if( rc ) then do; put 'MSMQMap: failed'; msg = sysmsg(); put msg; goto exit; end; end; /* make sure message belongs to this file */ if( substr(corrid,1,20) = filecorrid ) then do; if( seqno ^= input(substr(corrid,21,4), hex4.) ) then do; /* this message is out of sequence so search for it */ outofseq=1; call msmqcreatecursor(hQueue, hCursor2, rc); if( rc ) then do; put 'MSMQCreateCursor failed'; msg = sysmsg(); put msg; goto exit; end; action2="PEEK_CURRENT"; peeknxt: call msmqreceivemsg(hQueue, 0, action2, hCursor2, 0, rc, "CORRELATIONID", corrid2); if( rc ) then do; if( rc = input('1B000EC0'x, ib4.) ) then do; put 'Error: reached end of queue while searching for out-of-sequence msg'; goto exit; end; put 'MSMQReceiveMsg: failed'; msg = sysmsg(); put msg; goto exit; end; if( seqno ^= input(substr(corrid2,21,4), hex4.) ) then do; action2="PEEK_NEXT"; goto peeknxt; end; end; /* increment sequence number for next expected message */ seqno=seqno+1; /* retrieve record from internal buffer */ call msmqgetparms(hmap, rc, msgbuf); if( rc ) then do; put 'MSMQGetParms: failed'; msg = sysmsg(); put msg; goto exit; end; /* length of this stream is embedded as 1st byte in msg */ len = input(substr(msgbuf,1,1), pib1.); stream = substr(msgbuf,2); put 'write stream to file'; rc = fput(fileid, substr(stream,1,len)); if( rc ) then do; put 'Error writing to output file buffer...'; goto exit; end; /* flush it to disk */ rc = fwrite(fileid); if( rc ) then do; put 'Error writing to output file...'; goto exit; end;
/* now remove it from the queue... don't care about receiving body */ body=0; if( outofseq ) then do; call msmqreceivemsg(hQueue, 0, "RECEIVE", hCursor2, 0, rc, "body", body); /* close this cursor */ call msmqclosecursor(hCursor2, rc); end; else do; call msmqreceivemsg(hQueue, 0, "RECEIVE", hCursor, 0, rc, "body", body); end; /* we are now pointing at the next message */ action="PEEK_CURRENT"; end; end; /* finish retrieving all messages belonging to this file */ goto recv; exit: if( hQueue ) then do; call msmqclosequeue(hQueue, rc); if( rc ) then do; put 'MSMQCloseQueue: failed'; msg = sysmsg(); put msg; end; end; if( Qid ) then call msmqfree(Qid); if( hmap ) then call msmqfree(hmap); /* close file */ rc = fclose(fileid); if( rc ) then put 'Error closing output file'; run;