ActiveMQ 寻根究低 - 点对点模式详解

本文在 ActiveMQ 寻根究低 - JMS 规范概述 一问的基础上,探究点对点模式的具体变成实现方案,以及点对点模式在实际应用的的具体场景

本文在 ActiveMQ 寻根究低 - JMS 规范概述 一问的基础上,探究点对点模式的具体变成实现方案,以及点对点模式在实际应用的的具体场景。

JMS 程序设计模式

JMS 程序的模式主要是给予PTP 和 Pub/Sub 配置一些参数的不同来应用到不同的场景中。根据具体的业务对性能、对数据的安全性等再进行最终方案的选型。通用的结构如下图:

ActiveMQ 寻根究低 - 点对点模式详解

点对点(PTP)模式的实现

PTP 模式生产者

非事务方式的生产者

非事务的生产者需要将 connection.createSession(transacted, acknowledge) 的第一个参数设置为 FALSE 。第二个参数指明的消息接受者的应答方法。

classPTPSyncQueueProducerimplementsRunnable{privatefinalstaticLogger LOG = LoggerFactory.getLogger(PTPSyncQueueProducer.class); Connection connection; Session session; Destination destination; MessageProducer messageProducer;@Overridepublicvoidrun(){inti =0;while(true) {try{ connection = ConnectionUtils.getConnectionFactory().createConnection(); connection.start();// first param is transacted session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); destination = session.createQueue("PTPSyncQueueProducer"); messageProducer = session.createProducer(destination); ObjectMessage objectMessage = session.createObjectMessage(); objectMessage.setObject(newObj(++i,"littler Bai")); messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); messageProducer.setPriority(Message.DEFAULT_PRIORITY); messageProducer.setTimeToLive(Message.DEFAULT_TIME_TO_LIVE); messageProducer.send(objectMessage); LOG.info("send {}", i); TimeUnit.SECONDS.sleep(5); } catch(JMSException e) { ConnectionUtils.close(messageProducer, session, connection); } catch(InterruptedException e) { e.printStackTrace(); } } }}

基于事务的生成者

基于事务的生产者。只有再事务提交时,消息才已打包的方式一起发送到JMS Provider。

publicvoidtransactedProducer(){try{ connection = ConnectionUtils.getConnectionFactory().createConnection(); connection.start();// first param is transacted,if true transaction session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); destination = session.createQueue("PTPSyncQueueProducer"); messageProducer = session.createProducer(destination); messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); messageProducer.setPriority(Message.DEFAULT_PRIORITY); messageProducer.setTimeToLive(Message.DEFAULT_TIME_TO_LIVE);for(inti =0; i <10; i++) { ObjectMessage objectMessage = session.createObjectMessage(); objectMessage.setObject(newObj(i,"littler Bai")); messageProducer.send(objectMessage); LOG.info("send {}", i); } System.out.println("begin sleep"); TimeUnit.SECONDS.sleep(10); session.commit(); } catch(JMSException e) {try{ session.rollback(); } catch(JMSException ex) { ex.printStackTrace(); } e.printStackTrace(); } catch(InterruptedException e) { e.printStackTrace(); } finally{ ConnectionUtils.close(messageProducer, session, connection); } }

PTP 模式消费者

基于阻塞的消费者

基于阻塞的消费者采用 MessageConsumer.receive(timeout) 方法来阻塞当前线程,循环获取监测的 Destination 的消息。

classPTPQueueCustomer{ Connection connection; Session session; Destination destination; MessageConsumer messageConsumer;publicvoidconsume(){try{ connection = ConnectionUtils.getConnectionFactory().createConnection(); connection.start(); session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); destination = session.createQueue("PTPSyncQueueProducer"); messageConsumer = session.createConsumer(destination);while(true) { Message message = messageConsumer.receive(1000);if(messageinstanceofObjectMessage) { ObjectMessage objectMessage = (ObjectMessage) message; System.out.println(objectMessage.getObject()); } } } catch(JMSException e) { e.printStackTrace(); } finally{ ConnectionUtils.close(messageConsumer, session, connection); } }publicstaticvoidmain(String[] args){//new PTPQueueCustomer().consume();newPTPQueueCustomer().asyncConsume(); }}

基于MessageListener 的异步消费者

基于 MessageListener 的异步消费者需要实现该接口,并在 MessageConsumer.setMessageListener(listener) 方法上注册。

classQueueMessageListenerimplementsMessageListener{@OverridepublicvoidonMessage(Message message){if(messageinstanceofObjectMessage) { ObjectMessage objectMessage = (ObjectMessage) message;try{ System.out.println(objectMessage.getObject()); } catch(JMSException e) { e.printStackTrace(); } } }}publicvoidasyncConsume(){try{ connection = ConnectionUtils.getConnectionFactory().createConnection(); connection.start(); session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); destination = session.createQueue("PTPSyncQueueProducer"); messageConsumer = session.createConsumer(destination); messageConsumer.setMessageListener(newQueueMessageListener()); } catch(JMSException e) { e.printStackTrace(); } }

总结

虽然PTP 模式,规定了一个消息( Message )只能由一个消费者消费,但是并不意味着一个PTP 的 Destination 只能连接一个消费者。因此,PTP 模式典型应用由:

  • 应用到将同一件事情分散到多个处理者。
  • 订单处理系统,如定时抢购,银行纪念币抢兑
  • 一对一系统间调用的解耦
  • 基于PTP 实现 request-response 应答模式
未登录用户
全部评论0
到底啦