使用 Spring Boot集成 RabbitMq
Lei Chu Lv4

简介

AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。 AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。 RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。 下面将重点介绍RabbitMQ中的一些基础概念,了解了这些概念,是使用好RabbitMQ的基础。

rabbitmq

使用场景

场景说明:用户注册后,需要发注册邮件和注册短信,传统的做法有两种

  • 串行的方式

串行方式:将注册信息写入数据库后,发送注册邮件,再发送注册短信,以上三个任务全部完成后才返回给客户端。 这有一个问题是,邮件,短信并不是必须的,它只是一个通知,而这种做法让客户端等待没有必要等待的东西.

  • 并行的方式

并行方式:将注册信息写入数据库后,发送邮件的同时,发送短信,以上三个任务完成后,返回给客户端,并行的方式能提高处理的时间。

假设三个业务节点分别使用50ms,串行方式使用时间150ms,并行使用时间100ms。虽然并性已经提高的处理时间,但是,前面说过,邮件和短信对我正常的使用网站没有任何影响,客户端没有必要等着其发送完成才显示注册成功,英爱是写入数据库后就返回.

应用解耦

场景:双11是购物狂节,用户下单后,订单系统需要通知库存系统,传统的做法就是订单系统调用库存系统的接口.

这种做法有一个缺点:

当库存系统出现故障时,订单就会失败。(这样马云将少赚好多好多钱^ ^)
订单系统和库存系统高耦合.
引入消息队列

订单系统:用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功。

库存系统:订阅下单的消息,获取下单消息,进行库操作。
就算库存系统出现故障,消息队列也能保证消息的可靠投递,不会导致消息丢失(马云这下高兴了).

流量削峰

流量削峰一般在秒杀活动中应用广泛
场景:秒杀活动,一般会因为流量过大,导致应用挂掉,为了解决这个问题,一般在应用前端加入消息队列。
作用:

  1. 可以控制活动人数,超过此一定阀值的订单直接丢弃(我为什么秒杀一次都没有成功过呢^^)

  2. 可以缓解短时间的高流量压垮应用(应用程序按自己的最大处理能力获取订单)

  3. 用户的请求,服务器收到之后,首先写入消息队列,加入消息队列长度超过最大值,则直接抛弃用户请求或跳转到错误页面.

  4. 秒杀业务根据消息队列中的请求信息,再做后续处理.

SpringBoot 集成RabbitMq

添加依赖pom.xml

1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

添加数据库连接application.yml

1
2
3
4
5
6
7
8
9
10
spring:
rabbitmq:
host: youhostname
port: 5672
username: test
password: dev.5566
virtual-host: /
publisher-returns: true
template:
mandatory: true

添加topic config持久化该队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@Configuration
public class RabbitConfig {
//声明队列
@Bean
public Queue queue1() {
return new Queue("hello.queue1", true); // true表示持久化该队列
}

@Bean
public Queue queue2() {
return new Queue("hello.queue2", true);
}

//声明交互器
@Bean
TopicExchange topicExchange() {
return new TopicExchange("topicExchange");
}

//绑定
@Bean
public Binding binding1() {
return BindingBuilder.bind(queue1()).to(topicExchange()).with("key.1");
}

@Bean
public Binding binding2() {
return BindingBuilder.bind(queue2()).to(topicExchange()).with("key.#");
}
}

发送消息message

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
@Component
public class Sender implements ConfirmCallback, ReturnCallback {

@Autowired
private RabbitTemplate rabbitTemplate;

@PostConstruct
public void init() {
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnCallback(this);
}

@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
if (b) {
System.out.println("消息发送成功:" + correlationData);
} else {
System.out.println("消息发送失败:" + s);
}
}

@Override
public void returnedMessage(Message message, int i, String s, String s1, String s2) {

}

public void send(String msg){

CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());

System.out.println("开始发送消息 : " + msg.toLowerCase());
String response = rabbitTemplate.convertSendAndReceive("topicExchange", "key.1", msg, correlationId).toString();
System.out.println("结束发送消息 : " + msg.toLowerCase());
System.out.println("消费者响应 : " + response + " 消息处理完成");
}
}

消息接受方

1
2
3
4
5
6
7
8
9
10
11
@Component
public class Receiver {

@RabbitListener(queues = "hello.queue1")
public String processMessage1(String msg) {
System.out.println(Thread.currentThread().getName() + " 接收到来自hello.queue1队列的消息:" + msg);
return msg.toUpperCase();
}

}

controller类

1
2
3
4
5
6
7
8
9
10
11
12
13
@RestController
public class RabbitMqController {

@Autowired
private Sender sender;

@RequestMapping(value = "/rabbit", method = RequestMethod.GET)
public void rabbit() {
sender.send("ss");
}

}

测试结果

1
2
3
4
开始发送消息 : ss
SimpleAsyncTaskExecutor-1 接收到来自hello.queue1队列的消息:ss
结束发送消息 : ss
消费者响应 : SS 消息处理完成