首页 > 编程 > Java > 正文

Java HTTP协议收发MQ 消息代码实例详解

2019-11-26 12:32:18
字体:
来源:转载
供稿:网友

1. 准备环境

在工程 POM 文件添加 HTTP Java 客户端的依赖。

<dependency>  <groupId>org.eclipse.jetty</groupId>  <artifactId>jetty-client</artifactId>  <version>9.3.4.RC1</version> </dependency>   <dependency>  <groupId>com.aliyun.openservices</groupId>  <artifactId>ons-client</artifactId>  <version>1.1.11</version> </dependency>

2. 运行代码配置(user.properties)

您需要设置配置文件(user.properties)的相关内容,具体请参考申请 MQ 资源 。

#您在控制台创建的TopicTopic=xxx#公测urlURL=http://publictest-rest.ons.aliyun.com#阿里云身份验证码Ak=xxx#阿里云身份验证密钥Sk=xxx#MQ控制台创建的Producer IDProducerID=xxx#MQ控制台创建的Consumer IDConsumerID=xxx

说明:URL 中的 Key,Tag以及 POST Content-Type 没有任何的限制,只要确保Key 和 Tag 相同唯一即可,可以放在 user.properties 里面。

3. HTTP 发送消息示例代码

您可以按以下说明设置相应参数并测试 HTTP 消息发送功能。package com.aliyun.openservice.ons.http.demo;import java.nio.charset.Charset;import java.util.Date;import java.util.Properties;import org.eclipse.jetty.client.HttpClient;import org.eclipse.jetty.client.api.ContentProvider;import org.eclipse.jetty.client.api.ContentResponse;import org.eclipse.jetty.client.api.Request;import org.eclipse.jetty.client.util.StringContentProvider;import com.aliyun.openservices.ons.api.impl.authority.AuthUtil;public class HttpProducer {  public static String SIGNATURE="Signature";  public static String NUM="num";  public static String CONSUMERID="ConsumerID";  public static String PRODUCERID="ProducerID";  public static String TIMEOUT="timeout";  public static String TOPIC="Topic";  public static String AK="AccessKey";  public static String BODY="body";   public static String MSGHANDLE="msgHandle";  public static String TIME="time";  public static void main(String[] args) throws Exception {    HttpClient httpClient=new HttpClient();     httpClient.setMaxConnectionsPerDestination(1);    httpClient.start();     Properties properties=new Properties();    properties.load(HttpProducer.class.getClassLoader().getResourceAsStream("user.properties"));    String topic=properties.getProperty("Topic"); //请在user.properties配置您的Topic    String url=properties.getProperty("URL");//公测集群配置为http://publictest-rest.ons.aliyun.com/    String ak=properties.getProperty("Ak");//请在user.properties配置您的Ak    String sk=properties.getProperty("Sk");//请在user.properties配置您的Sk    String pid=properties.getProperty("ProducerID");//请在user.properties配置您的Producer ID    String date=String.valueOf(new Date().getTime());     String sign=null;    String body="hello ons http";    String NEWLINE="/n";    String signString;    for (int i = 0; i < 10; i++) {      date=String.valueOf(new Date().getTime());      Request req=httpClient.POST(url+"message/?topic="+topic+"&time="+date+"&tag=http"+"&key=http");      ContentProvider content=new StringContentProvider(body);      req.content(content);      signString=topic+NEWLINE+pid+NEWLINE+MD5.getInstance().getMD5String(body)+NEWLINE+date;      System.out.println(signString);      sign=AuthUtil.calSignature(signString.getBytes(Charset.forName("UTF-8")), sk);      req.header(SIGNATURE, sign);      req.header(AK, ak);      req.header(PRODUCERID, pid);      ContentResponse response;      response=req.send();      System.out.println("send msg:"+response.getStatus()+response.getContentAsString());    }   }}

4. HTTP接收消息示例代码

请按以下说明设置相应参数并测试 HTTP 消息接收功能。

package com.aliyun.openservice.ons.http.demo;import java.nio.charset.Charset;import java.util.Date;import java.util.List;import java.util.Properties;import org.eclipse.jetty.client.HttpClient;import org.eclipse.jetty.client.api.ContentProvider;import org.eclipse.jetty.client.api.ContentResponse;import org.eclipse.jetty.client.api.Request;import org.eclipse.jetty.client.util.StringContentProvider;import org.eclipse.jetty.http.HttpMethod;import com.alibaba.fastjson.JSON;import com.aliyun.openservice.ons.mqtt.demo.MqttProducer;import com.aliyun.openservices.ons.api.impl.authority.AuthUtil;public class HttpConsumer {  public static String SIGNATURE="Signature";  public static String NUM="num";  public static String CONSUMERID="ConsumerID";  public static String PRODUCERID="ProducerID";  public static String TIMEOUT="timeout";  public static String TOPIC="Topic";  public static String AK="AccessKey";  public static String BODY="body";   public static String MSGHANDLE="msgHandle";  public static String TIME="time";  public static void main(String[] args) throws Exception {    HttpClient httpClient=new HttpClient();     httpClient.setMaxConnectionsPerDestination(1);    httpClient.start();     Properties properties=new Properties();    properties.load(HttpConsumer.class.getClassLoader().getResourceAsStream("user.properties"));    String topic=properties.getProperty("Topic"); //请在user.properties配置您的topic    String url=properties.getProperty("URL");//公测集群配置为http://publictest-rest.ons.aliyun.com/    String ak=properties.getProperty("Ak");//请在user.properties配置您的Ak    String sk=properties.getProperty("Sk");//请在user.properties配置您的Sk    String cid=properties.getProperty("ConsumerID");//请在user.properties配置您的Consumer ID    String date=String.valueOf(new Date().getTime());     String sign=null;    String NEWLINE="/n";    String signString;    System.out.println(NEWLINE+NEWLINE);    while (true) {       try {        date=String.valueOf(new Date().getTime());        Request req=httpClient.POST(url+"message/?topic="+topic+"&time="+date+"&num="+32);        req.method(HttpMethod.GET);        ContentResponse response;        signString=topic+NEWLINE+cid+NEWLINE+date;        sign=AuthUtil.calSignature(signString.getBytes(Charset.forName("UTF-8")), sk);        req.header(SIGNATURE, sign);        req.header(AK, ak);        req.header(CONSUMERID, cid);        long start=System.currentTimeMillis();        response=req.send();        System.out.println("get cost:"+(System.currentTimeMillis()-start)/1000                   +"  "+response.getStatus()+"  "+response.getContentAsString());         List<SimpleMessage> list = null;        if (response.getContentAsString()!=null&&!response.getContentAsString().isEmpty()) {           list=JSON.parseArray(response.getContentAsString(), SimpleMessage.class);        }        if (list==null||list.size()==0) {          Thread.sleep(100);          continue;        }         System.out.println("size is :"+list.size());        for (SimpleMessage simpleMessage : list) {          date=String.valueOf(new Date().getTime());          System.out.println("receive msg:"+simpleMessage.getBody()+"  born time "+simpleMessage.getBornTime());          req=httpClient.POST(url+"message/?msgHandle="+simpleMessage.getMsgHandle()+"&topic="+topic+"&time="+date);          req.method(HttpMethod.DELETE);          signString=topic+NEWLINE+cid+NEWLINE+simpleMessage.getMsgHandle()+NEWLINE+date;          sign=AuthUtil.calSignature(signString.getBytes(Charset.forName("UTF-8")), sk);          req.header(SIGNATURE, sign);          req.header(AK, ak);          req.header(CONSUMERID, cid);          response=req.send();            System.out.println("delete msg:"+response.toString());        }         Thread.sleep(100);      } catch (Exception e) {        e.printStackTrace();      }    }  }}

5. HTTP示例程序工具类

(1)消息封装类: SimpleMessage.java

package com.aliyun.openservice.ons.http.demo;public class SimpleMessage {  private String body;  private String msgId;  private String bornTime;  private String msgHandle;  private int reconsumeTimes;  private String tag;  public void setTag(String tag) {    this.tag = tag;  }  public String getTag() {    return tag;  }  public int getReconsumeTimes() {    return reconsumeTimes;  }  public void setReconsumeTimes(int reconsumeTimes) {    this.reconsumeTimes = reconsumeTimes;  }  public void setMsgHandle(String msgHandle) {    this.msgHandle = msgHandle;  }  public String getMsgHandle() {    return msgHandle;  }  public String getBody() {    return body;  }  public void setBody(String body) {    this.body = body;  }  public String getMsgId() {    return msgId;  }  public void setMsgId(String msgId) {    this.msgId = msgId;  }  public String getBornTime() {    return bornTime;  }  public void setBornTime(String bornTime) {    this.bornTime = bornTime;  }}

(2)字符串签名类: MD5.java

package com.aliyun.openservice.ons.http.demo;import java.io.UnsupportedEncodingException;import java.nio.charset.Charset;import java.security.MessageDigest;import java.sql.SQLException;import java.util.Date;import java.util.HashMap;import java.util.Map;import java.util.concurrent.ConcurrentHashMap;import java.util.concurrent.locks.ReentrantLock;import org.slf4j.LoggerFactory;public class MD5 {  private static final org.slf4j.Logger log = LoggerFactory.getLogger(MD5.class);  private static char[] digits = { '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f' };  private static Map<Character, Integer> rDigits = new HashMap<Character, Integer>(16);  static {    for (int i = 0; i < digits.length; ++i) {      rDigits.put(digits[i], i);    }  }  private static MD5 me = new MD5();  private MessageDigest mHasher;  private final ReentrantLock opLock = new ReentrantLock();  private MD5() {    try {      this.mHasher = MessageDigest.getInstance("md5");    } catch (Exception e) {      throw new RuntimeException(e);    }  }  public static MD5 getInstance() {    return me;  }  public String getMD5String(String content) {    return this.bytes2string(this.hash(content));  }  public String getMD5String(byte[] content) {    return this.bytes2string(this.hash(content));  }  public byte[] getMD5Bytes(byte[] content) {    return this.hash(content);  }  public byte[] hash(String str) {    this.opLock.lock();    try {      byte[] bt = this.mHasher.digest(str.getBytes("utf-8"));      if (null == bt || bt.length != 16) {        throw new IllegalArgumentException("md5 need");      }      return bt;    } catch (UnsupportedEncodingException e) {      throw new RuntimeException("unsupported utf-8 encoding", e);    } finally {      this.opLock.unlock();    }  }  public byte[] hash(byte[] data) {    this.opLock.lock();    try {      byte[] bt = this.mHasher.digest(data);      if (null == bt || bt.length != 16) {        throw new IllegalArgumentException("md5 need");      }      return bt;    } finally {      this.opLock.unlock();    }  }  public String bytes2string(byte[] bt) {    int l = bt.length;    char[] out = new char[l << 1];    for (int i = 0, j = 0; i < l; i++) {      out[j++] = digits[(0xF0 & bt[i]) >>> 4];      out[j++] = digits[0x0F & bt[i]];    }    if (log.isDebugEnabled()) {      log.debug("[hash]" + new String(out));    }    return new String(out);  }  public byte[] string2bytes(String str) {    if (null == str) {      throw new NullPointerException("Argument is not allowed empty");    }    if (str.length() != 32) {      throw new IllegalArgumentException("String length must equals 32");    }    byte[] data = new byte[16];    char[] chs = str.toCharArray();    for (int i = 0; i < 16; ++i) {      int h = rDigits.get(chs[i * 2]).intValue();      int l = rDigits.get(chs[i * 2 + 1]).intValue();      data[i] = (byte) ((h & 0x0F) << 4 | l & 0x0F);    }    return data;  }}

希望本篇文章对您有所帮助

发表评论 共有条评论
用户名: 密码:
验证码: 匿名发表