首页 > 编程 > Java > 正文

java结合WebSphere MQ实现接收队列文件功能

2019-11-26 14:54:19
字体:
来源:转载
供稿:网友

首先我们先来简单介绍下websphere mq以及安装使用简介

websphere mq  : 用于传输信息 具有跨平台的功能。

1 安装websphere mq 并启动

2 websphere mq 建立 queue Manager (如:MQSI_SAMPLE_QM)

3 建立queue 类型选择 Local类型 的 (如lq  )

4 建立channels 类型选择Server Connection (如BridgeChannel)

接下来,我们来看实例代码:

MQFileReceiver.javapackage com.mq.dpca.file; import java.io.File;import java.io.FileOutputStream; import com.ibm.mq.MQEnvironment;import com.ibm.mq.MQException;import com.ibm.mq.MQGetMessageOptions;import com.ibm.mq.MQMessage;import com.ibm.mq.MQQueue;import com.ibm.mq.MQQueueManager;import com.ibm.mq.constants.MQConstants;import com.mq.dpca.msg.MQConfig;import com.mq.dpca.util.ReadCmdLine;import com.mq.dpca.util.RenameUtil; /** *  * MQ分组接收文件功能 * 主动轮询 */public class MQFileReceiver {  private MQQueueManager qmgr; // 连接到队列管理器   private MQQueue inQueue; // 传输队列   private String queueName = ""; // 队列名称   private String host = ""; //   private int port = 1414; // 侦听器的端口号   private String channel = ""; // 通道名称   private String qmgrName = ""; // 队列管理器   private MQMessage inMsg; // 创建消息缓冲   private MQGetMessageOptions gmo; // 设置获取消息选项   private static String fileName = null; // 接收队列上的消息并存入文件   private int ccsid = 0;   private static String file_dir = null;   /**   * 程序的入口   *    * @param args   */  public static void main(String args[]) {    MQFileReceiver mfs = new MQFileReceiver();    //初始化连接    mfs.initproperty();    //接收文件    mfs.runGoupReceiver();    //获取shell脚本名//   String shellname = MQConfig.getValueByKey(fileName);//   if(shellname!=null&&!"".equals(shellname)){//     //调用shell//     ReadCmdLine.callShell(shellname);//   }else{//     System.out.println("have no shell name,Only receive files.");//   }   }   public void runGoupReceiver() {    try {      init();      getGroupMessages();      qmgr.commit();      System.out.println("/n Messages successfully Receive ");    } catch (MQException mqe) {      mqe.printStackTrace();      try {        System.out.println("/n Backing out Transaction ");        qmgr.backout();        System.exit(2);      } catch (Exception e) {        e.printStackTrace();        System.exit(2);      }    } catch (Exception e) {      e.printStackTrace();      System.exit(2);    }  }   /**   * 初始化服务器连接信息   *    * @throws Exception   */  private void init() throws Exception {    /* 为客户机连接设置MQEnvironment属性 */    MQEnvironment.hostname = host;    MQEnvironment.channel = channel;    MQEnvironment.port = port;     /* 连接到队列管理器 */    qmgr = new MQQueueManager(qmgrName);     /* 设置队列打开选项以输 */    int opnOptn = MQConstants.MQOO_INPUT_AS_Q_DEF        | MQConstants.MQOO_FAIL_IF_QUIESCING;     /* 打开队列以输 */    inQueue = qmgr.accessQueue(queueName, opnOptn, null, null, null);  }   /**   * 接受文件的主函数   *    * @throws Exception   */  public void getGroupMessages() {    /* 设置获取消息选项 */    gmo = new MQGetMessageOptions();    gmo.options = MQConstants.MQGMO_FAIL_IF_QUIESCING;    gmo.options = gmo.options + MQConstants.MQGMO_SYNCPOINT;    /* 等待消息 */    gmo.options = gmo.options + MQConstants.MQGMO_WAIT;    /* 设置等待时间限制 */    gmo.waitInterval = 5000;    /* 只获取消息 */    gmo.options = gmo.options + MQConstants.MQGMO_ALL_MSGS_AVAILABLE;    /* 以辑顺序获取消息 */    gmo.options = gmo.options + MQConstants.MQGMO_LOGICAL_ORDER;    gmo.matchOptions = MQConstants.MQMO_MATCH_GROUP_ID;    /* 创建消息缓冲 */    inMsg = new MQMessage();    try {      FileOutputStream fos = null;      /* 处理组消息 */      while (true) {        try {          inQueue.get(inMsg, gmo);          if (fos == null) {            try {              fileName = inMsg.getStringProperty("fileName");              String fileName_full = null;              fileName_full = file_dir + RenameUtil.rename(fileName);              fos = new FileOutputStream(new File(fileName_full));              int msgLength = inMsg.getMessageLength();              byte[] buffer = new byte[msgLength];              inMsg.readFully(buffer);              fos.write(buffer, 0, msgLength);              /* 查看是否是最后消息标识 */              char x = gmo.groupStatus;              if (x == MQConstants.MQGS_LAST_MSG_IN_GROUP) {                System.out.println("Last Msg in Group");                break;              }              inMsg.clearMessage();             } catch (Exception e) {              System.out                  .println("Receiver the message without property,do nothing!");              inMsg.clearMessage();            }          } else {            int msgLength = inMsg.getMessageLength();            byte[] buffer = new byte[msgLength];            inMsg.readFully(buffer);            fos.write(buffer, 0, msgLength);            /* 查看是否是最后消息标识 */            char x = gmo.groupStatus;            if (x == MQConstants.MQGS_LAST_MSG_IN_GROUP) {              System.out.println("Last Msg in Group");              break;            }            inMsg.clearMessage();          }        } catch (Exception e) {          char x = gmo.groupStatus;          if (x == MQConstants.MQGS_LAST_MSG_IN_GROUP) {            System.out.println("Last Msg in Group");          }          break;        }      }      if (fos != null)        fos.close();    } catch (Exception e) {      System.out.println(e.getMessage());    }  }   public void initproperty() {    MQConfig config = new MQConfig().getInstance();    if (config.getMQ_MANAGER() != null) {      qmgrName = config.getMQ_MANAGER();      queueName = config.getMQ_QUEUE_NAME();      channel = config.getMQ_CHANNEL();      host = config.getMQ_HOST_NAME();      port = Integer.valueOf(config.getMQ_PROT());      ccsid = Integer.valueOf(config.getMQ_CCSID());      file_dir = config.getFILE_DIR();    }  }}

发表评论 共有条评论
用户名: 密码:
验证码: 匿名发表