ActiveMQ 寻根究低 - 点对点模式详解

本文在 ActiveMQ 寻根究低 - JMS 规范概述 一问的基础上,探究点对点模式的具体变成实现方案,以及点对点模式在实际应用的的具体场景

本文在 ActiveMQ 寻根究低 - JMS 规范概述 一问的基础上,探究点对点模式的具体变成实现方案,以及点对点模式在实际应用的的具体场景。

JMS 程序设计模式

JMS 程序的模式主要是给予PTP 和 Pub/Sub 配置一些参数的不同来应用到不同的场景中。根据具体的业务对性能、对数据的安全性等再进行最终方案的选型。通用的结构如下图:

ActiveMQ 寻根究低 - 点对点模式详解

点对点(PTP)模式的实现

PTP 模式生产者

非事务方式的生产者

非事务的生产者需要将 connection.createSession(transacted, acknowledge) 的第一个参数设置为 FALSE 。第二个参数指明的消息接受者的应答方法。

classPTPSyncQueueProducerimplementsRunnable{

privatefinalstaticLogger LOG = LoggerFactory.getLogger(PTPSyncQueueProducer.class);

Connection connection;

Session session;

Destination destination;

MessageProducer messageProducer;

@Override

publicvoidrun(){

inti =0;

while(true) {

try{

connection = ConnectionUtils.getConnectionFactory().createConnection();

connection.start();

// first param is transacted

session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);

destination = session.createQueue("PTPSyncQueueProducer");

messageProducer = session.createProducer(destination);

ObjectMessage objectMessage = session.createObjectMessage();

objectMessage.setObject(newObj(++i,"littler Bai"));

messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

messageProducer.setPriority(Message.DEFAULT_PRIORITY);

messageProducer.setTimeToLive(Message.DEFAULT_TIME_TO_LIVE);

messageProducer.send(objectMessage);

LOG.info("send {}", i);

TimeUnit.SECONDS.sleep(5);

} catch(JMSException e) {

ConnectionUtils.close(messageProducer, session, connection);

} catch(InterruptedException e) {

e.printStackTrace();

}

}

}

}

基于事务的生成者

基于事务的生产者。只有再事务提交时,消息才已打包的方式一起发送到JMS Provider。

publicvoidtransactedProducer(){

try{

connection = ConnectionUtils.getConnectionFactory().createConnection();

connection.start();

// first param is transacted,if true transaction

session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);

destination = session.createQueue("PTPSyncQueueProducer");

messageProducer = session.createProducer(destination);

messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

messageProducer.setPriority(Message.DEFAULT_PRIORITY);

messageProducer.setTimeToLive(Message.DEFAULT_TIME_TO_LIVE);

for(inti =0; i <10; i++) {

ObjectMessage objectMessage = session.createObjectMessage();

objectMessage.setObject(newObj(i,"littler Bai"));

messageProducer.send(objectMessage);

LOG.info("send {}", i);

}

System.out.println("begin sleep");

TimeUnit.SECONDS.sleep(10);

session.commit();

} catch(JMSException e) {

try{

session.rollback();

} catch(JMSException ex) {

ex.printStackTrace();

}

e.printStackTrace();

} catch(InterruptedException e) {

e.printStackTrace();

} finally{

ConnectionUtils.close(messageProducer, session, connection);

}

}

PTP 模式消费者

基于阻塞的消费者

基于阻塞的消费者采用 MessageConsumer.receive(timeout) 方法来阻塞当前线程,循环获取监测的 Destination 的消息。

classPTPQueueCustomer{

Connection connection;

Session session;

Destination destination;

MessageConsumer messageConsumer;

publicvoidconsume(){

try{

connection = ConnectionUtils.getConnectionFactory().createConnection();

connection.start();

session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);

destination = session.createQueue("PTPSyncQueueProducer");

messageConsumer = session.createConsumer(destination);

while(true) {

Message message = messageConsumer.receive(1000);

if(messageinstanceofObjectMessage) {

ObjectMessage objectMessage = (ObjectMessage) message;

System.out.println(objectMessage.getObject());

}

}

} catch(JMSException e) {

e.printStackTrace();

} finally{

ConnectionUtils.close(messageConsumer, session, connection);

}

}

publicstaticvoidmain(String[] args){

//new PTPQueueCustomer().consume();

newPTPQueueCustomer().asyncConsume();

}

}

基于MessageListener 的异步消费者

基于 MessageListener 的异步消费者需要实现该接口,并在 MessageConsumer.setMessageListener(listener) 方法上注册。

classQueueMessageListenerimplementsMessageListener{

@Override

publicvoidonMessage(Message message){

if(messageinstanceofObjectMessage) {

ObjectMessage objectMessage = (ObjectMessage) message;

try{

System.out.println(objectMessage.getObject());

} catch(JMSException e) {

e.printStackTrace();

}

}

}

}

publicvoidasyncConsume(){

try{

connection = ConnectionUtils.getConnectionFactory().createConnection();

connection.start();

session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);

destination = session.createQueue("PTPSyncQueueProducer");

messageConsumer = session.createConsumer(destination);

messageConsumer.setMessageListener(newQueueMessageListener());

} catch(JMSException e) {

e.printStackTrace();

}

}

总结

虽然PTP 模式,规定了一个消息( Message )只能由一个消费者消费,但是并不意味着一个PTP 的 Destination 只能连接一个消费者。因此,PTP 模式典型应用由:

  • 应用到将同一件事情分散到多个处理者。
  • 订单处理系统,如定时抢购,银行纪念币抢兑
  • 一对一系统间调用的解耦
  • 基于PTP 实现 request-response 应答模式

未登录用户
全部评论0
到底啦