RabbitMQ 基础篇

  1. 1. 消息队列
    1. 1.1. RabbitMQ
    2. 1.2. 安装 下载 命令
    3. 1.3. Spring boot 集成
      1. 1.3.1. 配置文件
      2. 1.3.2. 生产者
      3. 1.3.3. 消费者

消息队列

单线程中间件,主要用于异步通知、消息分发、缓存、分布式事务等场景。

RabbitMQ

主要有Exchange 交换器 和 Queue 队列功能组件。
生产者会向Exchange发送消息并且绑定一个RoutingKey,
Exchange 用来接收生产者发送的消息并通过模式和规则将这些消息路由给服务器中的队列,
Exchange通过BindingKey找到匹配的队列,Queue 用来保存消息直到发送给消费者。

rabbitmq_model

四种运行模式

  • fanout

每个发到 fanout 类型交换器的消息都会分到所有绑定的队列上去。
fanout 交换器不处理路由键,只是简单的将队列绑定到交换器上,
每个发送到交换器的消息都会被转发到与该交换器绑定的所有队列上。
很像子网广播,每台子网内的主机都获得了一份复制的消息。fanout
类型转发消息是最快的,忽略RoutingKey
fanout 模式

  • direct

Exchange 通过比配RoutingKey和BindingKey相同的队列进行转发
direct 模式

  • topic

与direct类似,更加灵活,RoutingKey 类似com.aa.bb./com.aa,bb.# ,用于
匹配BindingKey复合的队列(“
”用于匹配一个单词,“#”用于匹配多个单词(可以是零个))
topic 模式

  • headers

headers类型的Exchange不依赖于routing key与binding key的匹配规则来路由消息,而是根据发送的消息内容中的headers属性进行匹配。
在绑定Queue与Exchange时指定一组键值对;当消息发送到Exchange时,RabbitMQ会取到该消息的headers(也是一个键值对的形式),对比其中的键值对是否完全匹配Queue与Exchange绑定时指定的键值对;如果完全匹配则消息会路由到该Queue,否则不会路由到该Queue。
该类型的Exchange没有用到过(不过也应该很有用武之地),所以不做介绍。

参考:https://www.cnblogs.com/dwlsxj/p/RabbitMQ.html

安装 下载 命令

linux/centos : 需现安装Erlang 环境 再通过官网 rpm 文件安装或yum install

1
2
3
4
5
6
7
8
9
10
11
12
13
14
系统服务: server rabbitmq-server start
执行文件:
rabbitmq-server -detached # 使用守护进程方式启动
rabbitmq-server start # 使用阻塞方式启动
rabbitmqctl stop # 关闭rabbitmq
rabbitmqctl list_users # 查看后台管理员名单
rabbitmqctl list_queues # 查看当前的所有的队列
rabbitmqctl list_exchanges # 查看所有的交换机
rabbitmqctl list_bindings # 查看所有的绑定
rabbitmqctl list_connections # 查看所有的tcp连接
rabbitmqctl list_channels # 查看所有的信道
rabbitmqctl stop_app # 关闭应用
rabbitmqctl start_app # 打开应用
rabbitmqctl reset # 清空队列

Spring boot 集成

生成 Exchange 和 Queue 可以通过ip:15672 管理页面手动添加,也可以通过项目启动配置文件生成,
两种方式取并集。消息类型可以发送json字符串或对象。

配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
package com.dzdy.dcloud.dcloud.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


/**
* @author : wangzhiyong
* @date : 2019/1/17 14:47
* description : fanout 模式
*/
//@Configuration
public class FanoutRabbitConfig {

@Bean
public Queue fanoutQueueA(){
return new Queue("dcloud.fanout.queue");
}

@Bean
public Queue fanoutQueueB() {
return new Queue("dcloud.fanout.queue2");
}

@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange("dcloud.fanout");
}

@Bean
public Binding bindingExchangeA(Queue fanoutQueueA,FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutQueueA).to(fanoutExchange);
}

@Bean
public Binding bindingExchangeB(Queue fanoutQueueB,FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutQueueB).to(fanoutExchange);
}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
package com.dzdy.dcloud.dcloud.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
* @author : wangzhiyong
* @date : 2019/1/17 16:26
* description :
*/
//@Configuration
public class DirectRabbitConfig {

@Bean
public DirectExchange directExchange(){
return new DirectExchange("dcloud.direct");
}

@Bean
public Queue directQueueA(){
return new Queue("dcloud.direct.queue1");
}

@Bean
public Queue directQueueB() {
return new Queue("dcloud.direct.queue2");
}

@Bean
public Binding bindingExchangeA(Queue directQueueA, DirectExchange directExchange){
return BindingBuilder.bind(directQueueA).to(directExchange).with("aa");
}

@Bean
public Binding bindingExchangeBA(Queue directQueueB,DirectExchange directExchange){
return BindingBuilder.bind(directQueueB).to(directExchange).with("aa");
}

@Bean
public Binding bindingExchangeBB(Queue directQueueB,DirectExchange directExchange){
return BindingBuilder.bind(directQueueB).to(directExchange).with("bb");
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package com.dzdy.dcloud.dcloud.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;

/**
* @author : wangzhiyong
* @date : 2019/1/17 16:54
* description :
*/
//@Configuration
public class TopicRabbitConfig {
@Bean
public TopicExchange topicExchange() {
return new TopicExchange("dcloud.topic");
}

@Bean
public Queue topicQueueA() {
return new Queue("dcloud.topic.queue10");
}

@Bean
public Queue topicQueueB() {
return new Queue("dcloud.topic.queue11");
}

@Bean
public Binding bindingExchangeA(Queue topicQueueA, TopicExchange topicExchange) {
return BindingBuilder.bind(topicQueueA).to(topicExchange).with("com.ys");
}

//星号(*) :只能匹配一个单词 井号(#):可以匹配0个或多个单词
@Bean
public Binding bindingExchangeBA(Queue topicQueueB, TopicExchange topicExchange) {
return BindingBuilder.bind(topicQueueB).to(topicExchange).with("com.dzdy.#");
}

}

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
package com.dzdy.dcloud.dcloud;

import com.alibaba.fastjson.JSONObject;
import com.dzdy.dcloud.dcloud.vo.MqUserVo;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import javax.annotation.Resource;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;

@RunWith(SpringRunner.class)
@SpringBootTest
@Slf4j
public class DcloudMqApplicationTests {

@Resource
private AmqpTemplate rabbitTemplate;

private MqUserVo userVo = new MqUserVo("张氏", 12, 33.23, LocalDateTime.now());
@Test
public void contextLoads() {
}

// config 可以不用配置 如果配置会和网页配置的想重合

@Test
public void fanoutSendMsg() {
// rabbitTemplate.convertAndSend("dcloud.fanout","",new MqUserVo("张氏",12,33.23,LocalDateTime.now()));
rabbitTemplate.convertAndSend("dcloud.fanout", "", JSONObject.toJSONString(userVo));
// 直接发送到队列
// rabbitTemplate.convertAndSend("dcloud.fanout.queue","hello mq");
log.info("fanoutSendMsg - success");
}

@Test
public void directSendMsg() {
// 必须匹配 routingKey 不能为空
rabbitTemplate.convertAndSend("dcloud.direct", "bb", JSONObject.toJSONString(userVo));
log.info("directSendMsg - success");
}

@Test
public void topicSendMsg(){
List<MqUserVo> userVos = new ArrayList<>();
userVos.add(userVo);
userVos.add(userVo);
userVos.add(userVo);
rabbitTemplate.convertAndSend("dcloud.topic","com.dzdy",userVos);
log.info("topicSendMsg - success");
}

}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package com.dzdy.dcloud.dcloud.receive;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
* @author : wangzhiyong
* @date : 2019/1/17 15:03
* description :
*/
@Component
@RabbitListener(queues = "dcloud.direct.queue1")
public class DirectReceiverA {

@RabbitHandler
public void process(String message) {
System.out.println("receive3" + message);
}

// @RabbitHandler
// public void process(MqUserVo mqUserVo) {
// System.out.println("receive" + mqUserVo.toString());
// }
}
支付宝打赏 微信打赏

如果文章对你有帮助,欢迎点击上方按钮打赏作者