剑客
关注科技互联网

ActiveMQ 寻根究低 – 点对点模式详解

本文在 ActiveMQ 寻根究低 – JMS 规范概述 一问的基础上,探究点对点模式的具体变成实现方案,以及点对点模式在实际应用的的具体场景。

JMS 程序设计模式

JMS 程序的模式主要是给予PTP 和 Pub/Sub 配置一些参数的不同来应用到不同的场景中。根据具体的业务对性能、对数据的安全性等再进行最终方案的选型。通用的结构如下图:

ActiveMQ 寻根究低 - 点对点模式详解

点对点(PTP)模式的实现

PTP 模式生产者

非事务方式的生产者

非事务的生产者需要将 connection.createSession(transacted, acknowledge) 的第一个参数设置为 FALSE 。第二个参数指明的消息接受者的应答方法。

classPTPSyncQueueProducerimplementsRunnable{
privatefinalstaticLogger LOG = LoggerFactory.getLogger(PTPSyncQueueProducer.class);
 Connection connection;
 Session session;
 Destination destination;
 MessageProducer messageProducer;

@Override
publicvoidrun(){
inti =0;
while(true) {
try{
 connection = ConnectionUtils.getConnectionFactory().createConnection();
 connection.start();
// first param is transacted
 session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
 destination = session.createQueue("PTPSyncQueueProducer");
 messageProducer = session.createProducer(destination);
 ObjectMessage objectMessage = session.createObjectMessage();
 objectMessage.setObject(newObj(++i,"littler Bai"));

 messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
 messageProducer.setPriority(Message.DEFAULT_PRIORITY);
 messageProducer.setTimeToLive(Message.DEFAULT_TIME_TO_LIVE);

 messageProducer.send(objectMessage);
 LOG.info("send {}", i);
 TimeUnit.SECONDS.sleep(5);
 } catch(JMSException e) {
 ConnectionUtils.close(messageProducer, session, connection);
 } catch(InterruptedException e) {
 e.printStackTrace();
 }
 }
 }
}

基于事务的生成者

基于事务的生产者。只有再事务提交时,消息才已打包的方式一起发送到JMS Provider。

publicvoidtransactedProducer(){
try{
 connection = ConnectionUtils.getConnectionFactory().createConnection();
 connection.start();
// first param is transacted,if true transaction
 session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
 destination = session.createQueue("PTPSyncQueueProducer");
 messageProducer = session.createProducer(destination);

 messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
 messageProducer.setPriority(Message.DEFAULT_PRIORITY);
 messageProducer.setTimeToLive(Message.DEFAULT_TIME_TO_LIVE);

for(inti =0; i <10; i++) {
 ObjectMessage objectMessage = session.createObjectMessage();
 objectMessage.setObject(newObj(i,"littler Bai"));
 messageProducer.send(objectMessage);
 LOG.info("send {}", i);
 }
 System.out.println("begin sleep");
 TimeUnit.SECONDS.sleep(10);
 session.commit();

 } catch(JMSException e) {
try{
 session.rollback();
 } catch(JMSException ex) {
 ex.printStackTrace();
 }
 e.printStackTrace();
 } catch(InterruptedException e) {
 e.printStackTrace();
 } finally{
 ConnectionUtils.close(messageProducer, session, connection);
 }
 }

PTP 模式消费者

基于阻塞的消费者

基于阻塞的消费者采用 MessageConsumer.receive(timeout) 方法来阻塞当前线程,循环获取监测的 Destination 的消息。

classPTPQueueCustomer{
 Connection connection;
 Session session;
 Destination destination;
 MessageConsumer messageConsumer;

publicvoidconsume(){
try{
 connection = ConnectionUtils.getConnectionFactory().createConnection();
 connection.start();
 session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
 destination = session.createQueue("PTPSyncQueueProducer");
 messageConsumer = session.createConsumer(destination);
while(true) {
 Message message = messageConsumer.receive(1000);
if(messageinstanceofObjectMessage) {
 ObjectMessage objectMessage = (ObjectMessage) message;
 System.out.println(objectMessage.getObject());
 }
 }
 } catch(JMSException e) {
 e.printStackTrace();
 } finally{
 ConnectionUtils.close(messageConsumer, session, connection);
 }
 }
publicstaticvoidmain(String[] args){
//new PTPQueueCustomer().consume();
newPTPQueueCustomer().asyncConsume();
 }
}

基于MessageListener 的异步消费者

基于 MessageListener 的异步消费者需要实现该接口,并在 MessageConsumer.setMessageListener(listener) 方法上注册。

classQueueMessageListenerimplementsMessageListener{
@Override
publicvoidonMessage(Message message){
if(messageinstanceofObjectMessage) {
 ObjectMessage objectMessage = (ObjectMessage) message;
try{
 System.out.println(objectMessage.getObject());
 } catch(JMSException e) {
 e.printStackTrace();
 }
 }
 }
}

publicvoidasyncConsume(){
try{
 connection = ConnectionUtils.getConnectionFactory().createConnection();
 connection.start();
 session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
 destination = session.createQueue("PTPSyncQueueProducer");
 messageConsumer = session.createConsumer(destination);

 messageConsumer.setMessageListener(newQueueMessageListener());
 } catch(JMSException e) {
 e.printStackTrace();
 }
 }

总结

虽然PTP 模式,规定了一个消息( Message )只能由一个消费者消费,但是并不意味着一个PTP 的 Destination 只能连接一个消费者。因此,PTP 模式典型应用由:

  • 应用到将同一件事情分散到多个处理者。
  • 订单处理系统,如定时抢购,银行纪念币抢兑
  • 一对一系统间调用的解耦
  • 基于PTP 实现 request-response 应答模式
分享到:更多 ()

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址