简介
AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。 AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。 RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。 下面将重点介绍RabbitMQ中的一些基础概念,了解了这些概念,是使用好RabbitMQ的基础。
使用场景
场景说明:用户注册后,需要发注册邮件和注册短信,传统的做法有两种
串行方式:将注册信息写入数据库后,发送注册邮件,再发送注册短信,以上三个任务全部完成后才返回给客户端。 这有一个问题是,邮件,短信并不是必须的,它只是一个通知,而这种做法让客户端等待没有必要等待的东西.
并行方式:将注册信息写入数据库后,发送邮件的同时,发送短信,以上三个任务完成后,返回给客户端,并行的方式能提高处理的时间。
假设三个业务节点分别使用50ms,串行方式使用时间150ms,并行使用时间100ms。虽然并性已经提高的处理时间,但是,前面说过,邮件和短信对我正常的使用网站没有任何影响,客户端没有必要等着其发送完成才显示注册成功,英爱是写入数据库后就返回.
应用解耦
场景:双11是购物狂节,用户下单后,订单系统需要通知库存系统,传统的做法就是订单系统调用库存系统的接口.
这种做法有一个缺点:
当库存系统出现故障时,订单就会失败。(这样马云将少赚好多好多钱^ ^)
订单系统和库存系统高耦合.
引入消息队列
订单系统:用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功。
库存系统:订阅下单的消息,获取下单消息,进行库操作。
就算库存系统出现故障,消息队列也能保证消息的可靠投递,不会导致消息丢失(马云这下高兴了).
流量削峰
流量削峰一般在秒杀活动中应用广泛
场景:秒杀活动,一般会因为流量过大,导致应用挂掉,为了解决这个问题,一般在应用前端加入消息队列。
作用:
可以控制活动人数,超过此一定阀值的订单直接丢弃(我为什么秒杀一次都没有成功过呢^^)
可以缓解短时间的高流量压垮应用(应用程序按自己的最大处理能力获取订单)
用户的请求,服务器收到之后,首先写入消息队列,加入消息队列长度超过最大值,则直接抛弃用户请求或跳转到错误页面.
秒杀业务根据消息队列中的请求信息,再做后续处理.
SpringBoot 集成RabbitMq
添加依赖pom.xml
1 2 3 4
| <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
|
添加数据库连接application.yml
1 2 3 4 5 6 7 8 9 10
| spring: rabbitmq: host: youhostname port: 5672 username: test password: dev.5566 virtual-host: / publisher-returns: true template: mandatory: true
|
添加topic
config
持久化该队列
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| @Configuration public class RabbitConfig { @Bean public Queue queue1() { return new Queue("hello.queue1", true); }
@Bean public Queue queue2() { return new Queue("hello.queue2", true); }
@Bean TopicExchange topicExchange() { return new TopicExchange("topicExchange"); }
@Bean public Binding binding1() { return BindingBuilder.bind(queue1()).to(topicExchange()).with("key.1"); }
@Bean public Binding binding2() { return BindingBuilder.bind(queue2()).to(topicExchange()).with("key.#"); } }
|
发送消息message
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| @Component public class Sender implements ConfirmCallback, ReturnCallback {
@Autowired private RabbitTemplate rabbitTemplate;
@PostConstruct public void init() { rabbitTemplate.setConfirmCallback(this); rabbitTemplate.setReturnCallback(this); }
@Override public void confirm(CorrelationData correlationData, boolean b, String s) { if (b) { System.out.println("消息发送成功:" + correlationData); } else { System.out.println("消息发送失败:" + s); } }
@Override public void returnedMessage(Message message, int i, String s, String s1, String s2) {
}
public void send(String msg){
CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
System.out.println("开始发送消息 : " + msg.toLowerCase()); String response = rabbitTemplate.convertSendAndReceive("topicExchange", "key.1", msg, correlationId).toString(); System.out.println("结束发送消息 : " + msg.toLowerCase()); System.out.println("消费者响应 : " + response + " 消息处理完成"); } }
|
消息接受方
1 2 3 4 5 6 7 8 9 10 11
| @Component public class Receiver {
@RabbitListener(queues = "hello.queue1") public String processMessage1(String msg) { System.out.println(Thread.currentThread().getName() + " 接收到来自hello.queue1队列的消息:" + msg); return msg.toUpperCase(); }
}
|
controller类
1 2 3 4 5 6 7 8 9 10 11 12 13
| @RestController public class RabbitMqController {
@Autowired private Sender sender;
@RequestMapping(value = "/rabbit", method = RequestMethod.GET) public void rabbit() { sender.send("ss"); }
}
|
测试结果
1 2 3 4
| 开始发送消息 : ss SimpleAsyncTaskExecutor-1 接收到来自hello.queue1队列的消息:ss 结束发送消息 : ss 消费者响应 : SS 消息处理完成
|