RabbitMQ使用
一、 Windows安装教程
下载并安装erlang:RabbitMQ服务端代码是使用并发式语言Erlang编写的,安装Rabbit MQ的前提是安装Erlang。
下载地址:http://www.erlang.org/downloads
1.对应系统版本下载:
2.下载后就一路下一步,然后就是配置Erlang环境变量:
添加系统环境变量ERLANG_HOME,值为安装目录
修改系统环境变量Path,在PATH变量中添加“%ERLANG_HOME%\bin”
3.添加成功后运行erl,查看是否添加成功
erl
4.下载RabbitMQ,对应自己的系统下载
下载地址:http://www.rabbitmq.com/download.html
5.安装,默认一路下一步。
6.启动服务
进入到安装目录sbin目录,执行“rabbitmq-plugins enable rabbitmq_management”命令
rabbitmq-plugins enable rabbitmq_management
再输入“ rabbitmqctl status”命令
然后双击"rabbitmq-server.bat"
会弹出一个黑窗口不要关闭
使用浏览器登录管理页面:http://127.0.0.1:15672/
使用用户名密码(guest guest)登录
二、Docker安装
1.创建容器
docker run -it -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:management
2.端口映射说明
https://www.rabbitmq.com/networking.html
3.启动访问
用户名,密码都是:guest
三、ubuntu安装
1. 配置环境,进入官网
https://www.rabbitmq.com/#getstarted
查看与erlang对应关系,安装时需要版本对应
进入ubuntu安装页面
2. 将脚本复制下来,到sudo apt-get update -y就可以了,后面安装部分我们手动安装
3.创建vim install-mq.sh 文件,将内容粘贴进去,并给与执行权限
chmod o=rxw,u=rwx,g=rwx install-mq.sh
4.执行 ./install-mq.sh 文件
5.安装erlang,查看可以安装的版本
apt list -a erlang
6.选择版本进行安装
sudo apt-get install -y erlang=1:26.1.2-1
7.安装RabbitMQ,查看可以安装的版本
apt list -a rabbitmq-server
8.选择版本安装
sudo apt-get install -y rabbitmq-server=3.12.12-1
9.安装管理界面
rabbitmq-plugins enable rabbitmq_management
10. 访问页面
11.添加远程账号
rabbitmqctl add_user username password
12.为账号设置角色
rabbitmqctl set_user_tags admin administrator
角色说明:
administrator:超级管理员角色,可以登录控制台查看所有信息,并可以对用户、策略操作
monitoring:监控者角色,可以登录控制台查看rabbitmq节点相关信息,无法对策略管理
management:普通管理者角色,仅可以登录控制台,无法看到节点信息
13.设置权限
rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
14.其他操作
#停止
rabbitmqctl stop
#重启
rabbitmq-server -detached
rabbitmqctl start_app
#状态
rabbitmqctl status
#查看账户列表
rabbitmqctl list_users
#删除角色
rabbitmqctl delete_user username
#修改密码
rabbitmqctl change_password username password
四、集群
1.拷贝主服务器cookie文件到其他机器
scp /var/lib/rabbitmq/.erlang.cookie root@192.168.137.102:/var/lib/rabbitmq/
2.所有机器重启
rabbitmq-server -detached
3.节点2上面执行
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl join_cluster rabbit@主机名1
rabbitmqctl start_app
4.节点3上面执行
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl join_cluster rabbit@j节点2
rabbitmqctl start_app
5.查看集群状态
rabbitmqctl cluster_status
6.解除集群节点
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl start_app
rabbitmqctl cluster_status
rabbitmqctl forget_cluster_node rabbit@node2(在node1节点上面执行)
7.镜像队列
8.标识启动
查看标识:
rabbitmqctl list_feature_flags
启动:
rabbitmqctl enable_feature_flag all
9.端口开通
管理控制台:15672
客户端:5672
集群通信:25672
五、控制台使用
1.功能介绍
2.交换机类型
3.交换机创建
4.队列创建
5.点击交换机添加绑定
6.Topic交换机绑定
六、Spring Boot使用
1.配置文件
spring:
#RabbitMQ配置
rabbitmq:
host: 192.168.137.188
port: 5672
username: guest
password: guest
#开启消息抵达服务器确认-交互模式,publisher-confirms过时
publisher-confirm-type: correlated
#开启消息抵达队列确认
publisher-returns: true
#只要抵达队列,以异步方式回调我们returnconfirm
template:
mandatory:
#手动ACK消息,不让它自动回复,不然消息未成功处理消息就丢失了
listener:
simple:
acknowledge-mode: manual
2.依赖引用
<!-- RabbitMQ -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
3.配置类
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
@EnableRabbit
public class RabbitMQConfig {
@Bean
/**
* 对象格式化Json
*/
public Jackson2JsonMessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
}
import lombok.RequiredArgsConstructor;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
@Component
@RequiredArgsConstructor
public class RabbitTemplateConfig {
private final RabbitTemplate rabbitTemplate;
/**
* 定制RabbitTemplate
* @PostConstruct:对象创建完成以后执行这个方法
*/
@PostConstruct
public void initRabbitTemplate(){
/**
* 设置抵达服务器确认回调
*
* 配置:
* spring.rabbitmq.publisher-confirm-type: correlated
*/
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
/**
* 只要消息抵达服务器 ack=true
* @param correlationData:当前消息的唯一关联数据(这个消息的唯一ID)
* @param ack:消息是否成功搜道
* @param cause:失败原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("--------------[消息抵达服务器回调]----------------------");
System.out.println("ID:"+correlationData+",是否成功:"+ack+",失败原因:"+cause);
}
});
/**
* 设置消息未抵达队列的回调
*
* 配置:
* #开启消息抵达队列确认
* spring.rabbitmq.publisher-returns: true
* #只要抵达队列,以异步方式回调我们returnconfirm
* spring.rabbitmq.template.mandatory: true
*/
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback(){
/**
* 只要消息没有投递到指定队列,就触发这个失败回调
* returnedMessage.getMessage():投递失败的消息详细信息
* returnedMessage.getReplyCode():回复的状态码
* returnedMessage.getReplyText():回复的文本内容
* returnedMessage.getExchange():当时这个消息发给哪个交换机
* returnedMessage.getRoutingKey():当时这个消息用的哪个路由键
*/
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
System.out.println("=============[消息未抵达队列回调]======================");
System.out.println("getMessage:"+returnedMessage.getMessage());
System.out.println("getReplyCode:"+returnedMessage.getReplyCode());
System.out.println("getReplyText:"+returnedMessage.getReplyText());
System.out.println("getExchange:"+returnedMessage.getExchange());
System.out.println("getRoutingKey:"+returnedMessage.getRoutingKey());
}
});
}
}
4.测试类发送消息
import com.cpc.user.entity.CustomerInfoUserEntity;
import com.cpc.utils.emailUtils.EmailUtil;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
class EmailApplicationTests {
@Autowired
private EmailUtil emailService;
@Autowired
AmqpAdmin amqpAdmin;
//创建交换机
@Test
public void createExchanges(){
TopicExchange topicExchange=new TopicExchange("lry.test",true,false);
amqpAdmin.deleteExchange("lry.test");
amqpAdmin.declareExchange(topicExchange);
}
//创建交换机
@Test
public void createQueues(){
Queue queue=new Queue("lry.test",true);
amqpAdmin.deleteQueue("lry.test");
amqpAdmin.declareQueue(queue);
}
//交换机绑定队列
@Test
public void exchangeBindQueue(){
//String destination=目的地
//DestinationType destinationType=绑定类型
//String exchange=交换机名称
//String routingKey=路由键
// @Nullable Map<String, Object> arguments=参数
Binding binding=new Binding("lry.test", Binding.DestinationType.QUEUE,"lry.test","#.test",null);
amqpAdmin.removeBinding(binding);
amqpAdmin.declareBinding(binding);
}
@Autowired
RabbitTemplate rabbitTemplate;
//发送消息
@Test
public void sendMesage(){
CustomerInfoUserEntity ciue=new CustomerInfoUserEntity();
ciue.setGid("aaa");
ciue.setPoc("POC");
ciue.setRegion("CPC");
ciue.setUser_name("aaaaaaa");
rabbitTemplate.convertAndSend("lry.test","#.test",ciue);
}
}
5.消息接收监听
import com.cpc.email.service.RabbitMQAlertMailService;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody;
@Controller
@RequiredArgsConstructor
@RequestMapping("/mq")
public class RabbitMQAlertMailController {
private final RabbitMQAlertMailService service;
@RequestMapping("/sendMessage")
@ResponseBody
public String sendMessage(@RequestParam(value="num",required = false,defaultValue = "10") Integer num){
return service.sendMessage(num);
}
}
import com.cpc.email.entity.AlertEmailInfoEntity;
import com.cpc.user.entity.CustomerInfoUserEntity;
import com.rabbitmq.client.Channel;
import lombok.RequiredArgsConstructor;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.util.UUID;
@Service
@RequiredArgsConstructor
@RabbitListener(queues = {"lry.test"})
public class RabbitMQAlertMailService {
private final RabbitTemplate rabbitTemplate;
public String sendMessage(Integer num){
for(int i=0;i<num;i++){
CustomerInfoUserEntity ciue=new CustomerInfoUserEntity();
AlertEmailInfoEntity aeie=new AlertEmailInfoEntity();
if(i%3==0){//成功:类型是CustomerInfoUserEntity
ciue.setGid("GID-"+i);
ciue.setPoc("POC-"+i);
ciue.setRegion("CPC-"+i);
ciue.setUser_name("Name-"+i);
rabbitTemplate.convertAndSend("lry.test","#.test",ciue,new CorrelationData(UUID.randomUUID().toString()));
}else if(i%4==0){ //成功:类型是AlertEmailInfoEntity
aeie.setGid("GID-"+i);
rabbitTemplate.convertAndSend("lry.test","#.test",aeie,new CorrelationData(UUID.randomUUID().toString()));
}else if(i%5==0){//失败,交换机错误:进入ConfirmCallback
ciue.setGid("GID-"+i);
ciue.setPoc("POC-"+i);
ciue.setRegion("CPC-"+i);
ciue.setUser_name("Name-"+i);
rabbitTemplate.convertAndSend("lry.test1","#.test",ciue,new CorrelationData(UUID.randomUUID().toString()));
}else{//失败,路由键错误:进入ReturnsCallback
aeie.setGid("GID-"+i);
rabbitTemplate.convertAndSend("lry.test","#.tes00t",aeie,new CorrelationData(UUID.randomUUID().toString()));
}
}
return "OK";
}
//@RabbitListener(queues = {"lry.test"})
/**
* Message:原生消息详细信息,头+体
* CustomerInfoUserEntity:发送消息的类型
* Channel:当前传输数据的通道
*
* @RabbitListener:可以标注到类和方法上面
* @RabbitHandler:只能标注方法上面,如果多个类型(就是第二个参数 CustomerInfoUserEntity),可以把@RabbitListener标注在类上面,@RabbitHandler标注在方法上面。
*/
@RabbitHandler
public void recieveMessage(Message message, CustomerInfoUserEntity entity, Channel channel){
//消息头
MessageProperties messageProperties = message.getMessageProperties();
//消息体
byte[] body = message.getBody();
//System.out.println("消息头:"+messageProperties);
//System.out.println("消息体:"+body);
System.out.println("+++++++++++++++++[CustomerInfoUserEntity类型消息]++++++++++++++++++++++++++");
System.out.println("消息体Json[CustomerInfoUserEntity]:"+entity);
//手动ACK需要的参数,channel内按顺序自增
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
//确认消息,第二个参数是否批量确认
channel.basicAck(deliveryTag,false);
} catch (IOException e) {
//只有网络中断才会报异常
e.printStackTrace();
}
}
@RabbitHandler
public void recieveMessage(Message message, AlertEmailInfoEntity entity, Channel channel){
//消息头
MessageProperties messageProperties = message.getMessageProperties();
//消息体
byte[] body = message.getBody();
//System.out.println("消息头:"+messageProperties);
//System.out.println("消息体:"+body);
System.out.println("//////////////////////[AlertEmailInfoEntity类型消息]//////////////////////////////");
System.out.println("消息体Json[AlertEmailInfoEntity]:"+entity);
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
//拒绝:参数一:deliveryTag,参数二:是否批量拒绝,参数三:是否重新回到队列,如果是false就是直接丢弃这个消息
channel.basicNack(deliveryTag,false,true);
} catch (IOException e) {
//只有网络中断才会报异常
e.printStackTrace();
}
}
}
赞(1)
赏