在上一篇里,我们讲了rabbitMQ的通信流程。接下来,通过代码来实现exchange的几种类型,看一下rabbitMQ是如何运用的。
exchange几种类型的实现方式
fanout
如果一个exchange的类型被定义为fanout,那么发送到该exchange上的消息会发送给所有监听该exchange的queue中。Fanout类型的exchange不需要处理routingkey,只需要队列监听该exchange就能接收到消息,类似于广播。
接下来,来看一下代码中的实现,首先是消息的消费者
|
|
- 消息发送者
|
|
direct
发送到类型为direct的exchange的消息,会根据routingkey来判断消息发送给哪个queue。每个绑定该exchange的queue都需要绑定一个routingkey。如果接收到的消息不符合绑定的每一个routingkey,则消息会被抛弃掉。
- 消息的消费者:和之前的消费者代码差不多,只是多了绑定routingKey这一步和设置类型改为direct
···
/*
创建一个名为cplcqueue的绑定了3个routingkey的queue,监听direct类型的名为directExchange的exchange。
/
public class ReceiveLogsDirect {
private static final String EXCHANGE_NAME = “directExchange”;
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "direct",true);
String queueName = "cplcqueue";
String[] a = new String[3];
a[0] = "cplc_vm_cachedFileRequest35";
a[1] = "cplc_vm_downloadRequest35";
a[2] = "cplc_vm_delFileRequest35";
if(a.length <1){
System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]");
System.exit(1);
}
for(int i=0;i<3;i++){
channel.queueBind(queueName, EXCHANGE_NAME, a[i]);
}
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, true, consumer);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
String routingKey = delivery.getEnvelope().getRoutingKey();
System.out.println(" [x] Received '" + routingKey + "':'" + message + "'");
}
}
}
···
- 消息生产者
|
|
topic
发送到类型为topic的exchange的消息,会将消息转发到所有匹配topic的queue上。该exchange会将routingKey与某topic进行模糊匹配,队列需要绑定一个一个topic,可以使用通配符进行模糊匹配,“#”匹配一个或多个,而“”只匹配一个词。如“log.#”可以匹配“log.info.abc”,而“log.”则只会匹配到类似“log.abc”这样后面只有一个词的routingkey。
topic的消息消费者实现
|
|
消息发送者实现
|
|
headers
这个类型的exchange基本不会用到,主要是根据发送的消息中所带的header这个参数来判断的。没有进行深入的了解,就不说了。
RPC的实现
RPC(远程进程调用)的主要过程是,消费者接收到消息后在对消息进行处理后把处理结果与收到消息的反馈一并发回服务端。这与exchange是哪种类型是无关的,所以,任何一种类型的exchange都可以用来实现rpc模式。
消息发送者:与普通的消息发送者的区别在于,事先设置不立即对接收到的消息进行反馈,而是发送完消息后监听消息队列,等待返回结果,直到收到结果才关闭通道和连接。
|
|
消息消费者:处理完消息后,把应答机制设置为true,把处理结果和消息反馈一起发给服务器
|
|
注:上述代码测试,如果是先运行生产者的话,生产的消息因为还没有queue监听,会暂存在server上,然后再运行消费者的话,server会自动把消息分配给消费者。所以,不管是先运行消费者或者生产者都可以。