RabbitMQ3.12.2:单节点与集群部署实战指南
- 开源代码
- 2025-09-03 16:39:02

前言:在当今的分布式系统架构中,消息队列已经成为不可或缺的组件之一。它不仅能够实现服务之间的解耦,还能有效提升系统的可扩展性和可靠性。RabbitMQ 作为一款功能强大且广泛使用的开源消息中间件,凭借其高可用性、灵活的路由策略和丰富的插件生态系统,成为了许多开发者和企业的首选。 随着版本的不断迭代,RabbitMQ 3.12.2 带来了诸多改进和新特性,进一步提升了其性能和稳定性。无论是小型项目还是大规模的生产环境,RabbitMQ 都能提供可靠的解决方案。本文将详细介绍如何在 Linux 系统上部署 RabbitMQ 3.12.2 的单节点和集群版本,帮助读者快速搭建开发环境,并为生产环境的部署提供参考。 无论是初学者还是有一定经验的开发者,都可以通过本文掌握 RabbitMQ 的核心部署流程。我们将从基础环境准备开始,逐步深入到单节点和集群的配置,最后通过实战案例展示如何优化和监控 RabbitMQ 系统。让我们一起开启 RabbitMQ 的部署之旅,探索其在现代架构中的强大能力。
rabbitmq-3.12.2单节点部署:RabbitMQ 3.12.2 单机部署指南 部署环境:CentOS 7+/Ubuntu 20.04+ | 内存≥2GB | 磁盘≥10GB
一、部署前准备 1. 系统环境配置 CentOS sudo yum install -y epel-release sudo yum install socat logrotate -y Ubuntu sudo apt update sudo apt install -y socat logrotate init-system-helpers 2. Erlang环境安装 添加Erlang仓库(以CentOS为例) curl -s packagecloud.io/install/repositories/rabbitmq/erlang/script.rpm.sh | sudo bash sudo yum install erlang-25.3.2-1.el7.x86_64 -y # 确认版本兼容性
二、RabbitMQ核心安装 1. 安装主程序 添加RabbitMQ仓库 curl -s packagecloud.io/install/repositories/rabbitmq/rabbitmq-server/script.rpm.sh | sudo bash sudo yum install rabbitmq-server-3.12.2-1.el8.noarch -y 2. 服务管理 sudo systemctl start rabbitmq-server sudo systemctl enable rabbitmq-server sudo rabbitmq-plugins enable rabbitmq_management # 启用Web控制台
三、基础配置 1. 防火墙设置 sudo firewall-cmd --permanent --add-port={5672/tcp,15672/tcp,25672/tcp} sudo firewall-cmd --reload 2. 账户配置 sudo rabbitmqctl add_user admin your_strong_password sudo rabbitmqctl set_user_tags admin administrator sudo rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"
四、性能优化配置 1. 文件描述符调整 编辑/etc/systemd/system/rabbitmq-server.service.d/limits.conf [Service] LimitNOFILE=300000 2. 日志管理 修改/etc/rabbitmq/rabbitmq.conf log.connection.level = info log.file.rotation.size = 100000000 # 100MB切割
五、安全加固 1. 端口修改 /etc/rabbitmq/rabbitmq.conf listeners.tcp.default = 5673 management.tcp.port = 15673 2. SSL配置(可选) listeners.ssl.default = 5671 ssl_options.cacertfile = /path/to/ca_certificate.pem ssl_options.certfile = /path/to/server_certificate.pem
六、验证与监控 1. 服务状态检查 sudo rabbitmqctl status | grep -E 'uptime|memory' 2. 管理界面访问 http://[服务器IP]:15672
七、常见故障排查 查看实时日志 tail -f /var/log/rabbitmq/rabbit@$(hostname).log 端口占用检查 ss -tulnp | grep 5672 节点健康检测 rabbitmq-diagnostics check_running
部署总结: 通过上述步骤可完成具备生产级特性的单机部署,建议:
定期备份/var/lib/rabbitmq目录使用rabbitmqadmin工具进行日常管理监控内存水位(建议≤70%) rabbitmq-3.12.2集群部署: 一、基础环境准备 1. 服务器规划在部署RabbitMQ集群之前,我们需要规划好集群的节点信息。以下是集群的服务器规划:
角色IP地址开放端口RMQ-Master192.168.1.1015672, 15672, 25672, 4369, 9100-9105RMQ-Node1192.168.1.102同上RMQ-Node2192.168.1.103同上 2. 前置条件在所有节点上执行以下命令,安装必要的工具和依赖:
sudo apt-get update sudo apt-get install -y socat logrotate hostname二、Erlang环境安装
RabbitMQ基于Erlang语言开发,因此需要先安装Erlang环境。以下是基于Ubuntu 22.04的安装步骤:
添加Erlang仓库: wget -O- packages.erlang-solutions /ubuntu/erlang_solutions.asc | sudo apt-key add - echo "deb packages.erlang-solutions /ubuntu focal contrib" | sudo tee /etc/apt/sources.list.d/rabbitmq.list 安装Erlang 25.3: sudo apt-get update sudo apt-get install -y erlang erlang-nox三、RabbitMQ安装配置 1. 安装主程序
通过以下命令安装RabbitMQ服务:
curl -s packagecloud.io/install/repositories/rabbitmq/rabbitmq-server/script.deb.sh | sudo bash sudo apt-get install rabbitmq-server=3.12.2-1 2. 基础配置启用管理插件:
sudo rabbitmq-plugins enable rabbitmq_management创建配置文件 /etc/rabbitmq/rabbitmq.conf:
sudo tee /etc/rabbitmq/rabbitmq.conf <<EOF listeners.tcp.default = 5672 management.tcp.port = 15672 cluster_formation.peer_discovery_backend = rabbit_peer_discovery_classic_config cluster_formation.classic_config.nodes.1 = rabbit@rmq-master cluster_formation.classic_config.nodes.2 = rabbit@rmq-node1 cluster_formation.classic_config.nodes.3 = rabbit@rmq-node2 EOF四、集群部署 1. 同步Cookie文件
RabbitMQ集群通过.erlang.cookie文件进行节点认证。我们需要确保所有节点的cookie一致。
在主节点获取cookie:
sudo cat /var/lib/rabbitmq/.erlang.cookie将cookie同步到所有节点:
sudo systemctl stop rabbitmq-server sudo rm -f /var/lib/rabbitmq/.erlang.cookie echo "ABCDEFGHIJKLMNOPQRSTUVWXYZ" | sudo tee /var/lib/rabbitmq/.erlang.cookie sudo chmod 400 /var/lib/rabbitmq/.erlang.cookie 2. 加入集群在节点1执行:
sudo rabbitmqctl stop_app sudo rabbitmqctl join_cluster rabbit@rmq-master sudo rabbitmqctl start_app在节点2执行:
sudo rabbitmqctl stop_app sudo rabbitmqctl join_cluster rabbit@rmq-master sudo rabbitmqctl start_app 3. 验证集群状态在任意节点执行以下命令,验证集群状态:
sudo rabbitmqctl cluster_status五、高可用配置 1. 镜像队列策略
为了确保消息的高可用性,我们为所有队列启用镜像队列策略:
rabbitmqctl set_policy ha-all "^" '{"ha-mode":"all","ha-sync-mode":"automatic"}' --priority 0 --apply-to queues 2. 负载均衡配置(HAProxy示例)使用HAProxy进行负载均衡,配置文件如下:
frontend rabbitmq_front bind *:5672 mode tcp default_backend rabbitmq_back backend rabbitmq_back balance leastconn server rmq1 192.168.1.101:5672 check inter 5000 rise 2 fall 3 server rmq2 192.168.1.102:5672 check inter 5000 rise 2 fall 3 server rmq3 192.168.1.103:5672 check inter 5000 rise 2 fall 3六、系统集成 1. SpringBoot配置示例
在SpringBoot项目中,配置RabbitMQ客户端:
spring: rabbitmq: addresses: 192.168.1.101:5672,192.168.1.102:5672,192.168.1.103:5672 username: admin password: securepass virtual-host: / connection-timeout: 5000 2. 管理界面验证通过浏览器访问管理界面: http://192.168.1.101:15672
七、运维增强 1. 开机自启
确保RabbitMQ服务开机自启:
sudo systemctl enable rabbitmq-server 2. ActiveMQ守护与监控脚本功能简述: 本脚本用于监控ActiveMQ集群实例的运行状态,并在实例异常退出时自动重启。同时,脚本会检查ActiveMQ的日志文件,检测是否出现leveldb数据损坏的错误日志(如Could not load message seq或No reader available for position),并在检测到错误时记录详细的告警信息到activemq-cluster-error-alert.log文件中。此脚本适用于生产环境中ActiveMQ集群的高可用性监控和故障恢复。 核心功能矩阵:
功能模块实现方式关键指标进程存活监控每60秒检测activemq-cluster1和activemq-cluster2进程状态检测精度:100%异常自动恢复进程消失时自动执行:1. 数据目录备份(带时间戳)2. 服务重启恢复时间:<30秒LevelDB健康检测实时扫描日志中的关键错误:- Could not load message seq- No reader available错误检出率:95%智能告警系统结构化日志输出到activemq-cluster-error-alert.log告警延迟:<60秒自保护机制通过nohup实现后台运行,开机自启动配置运行稳定性:7×24小时使用方法: ① 手动启动:su - root -c '/mpjava/amqwatch.sh &' ② 开机自启动: 将以下命令加入/etc/rc.local:su - root -c '/mpjava/amqwatch.sh &' ③ 或者创建systemd服务文件(推荐):
cat <<EOF > /etc/systemd/system/amqwatch.service [Unit] Description=ActiveMQ Watcher Service After=network.target [Service] ExecStart=/mpjava/amqwatch.sh Restart=always User=root [Install] WantedBy=multi-user.target EOF systemctl enable amqwatch.service systemctl start amqwatch.service示例日志输出: 当检测到ActiveMQ实例异常或日志错误时,脚本会记录以下格式的告警信息到activemq-cluster-error-alert.log:
start----------------------------------------------------- 日志编号:监控脚本-2001 时间:2025-02-17 14:30:00 主机名称:amq-cluster-01(192.168.1.101) 步骤:0 类:ActiveMQ-61616 功能描述:ActiveMQ数据文件leveldb同步异常,文件损坏! 用户名:SYS 日志:ActiveMQ数据文件leveldb同步异常,文件损坏!异常日志内容:Could not load message seq 和 No reader available for position 解决方案:建议在非生产时间,结束所有ActiveMQ进程。 级别:ERROR 其他参数: end----------------------------------------------------注意事项: **① 权限问题:**脚本需要以root用户运行,以确保能够正常操作ActiveMQ实例和日志文件。 **② 备份数据目录:**在重启ActiveMQ实例时,脚本会自动备份data目录,以防止数据损坏导致的问题。 **③ 日志文件路径:**确保activemq-cluster-error-alert.log文件路径存在,否则需要手动创建。 **④ 监控频率:**脚本以60秒为周期运行,可根据实际需求调整监控频率。 通过此脚本,可以有效提升ActiveMQ集群的高可用性和稳定性,减少因实例异常或数据损坏导致的业务中断风险。
脚本内容:
#!/bin/bash #add for chkconfig #chkconfig: 2345 70 30 #description:AmqWatch shell #关于脚本的简短描述 #processname:AmqWatch #第一个进程名,后边设置自启动的时候会用到 #开机启动/etc/rc.local加入:su - root -c '/mpjava/amqwatch.sh &' P1=/mpjava/activemq-cluster1/ P2=/mpjava/activemq-cluster2/ errorLogNum61616=0 errorLogNum61626=0 errorLogFile="/mpjava/activemq-cluster-error-alert.log" local_host=`hostname` local_ip=`/sbin/ifconfig -a|grep inet|grep -v 127.0.0.1|grep -v 192.168.122.1|grep -v inet6|awk '{print $2}'|tr -d "addr:"` while true; do #获取运行程序的路径 pidpath=$(ps x | grep activemq | grep -v grep | awk '{print $9}') echo $pidpath datetime=`date +%Y%m%d_%H%M%S_%N |cut -b1-20` currTime=$(date +"%Y-%m-%d %T") date=$(date +%Y%m%d) if [[ $pidpath =~ $P1 ]] then echo "$P1 已经存在" else echo "start activemq. $P1 bin/activemq" mv ${P1}data ${P1}data.bak$datetime nohup ${P1}bin/activemq start >/dev/null 2>&1 & fi #检查61616日志 #tmpLogNum1=`cat /mpjava/activemq-cluster1/data/activemq.log |grep -E "Could not load message seq|No reader available for position" |wc -l` tmpLogNum1=`grep -E "Could not load message seq|No reader available for position" ${P1}/data/activemq.log |wc -l` #tmpLogNum1=$? echo "tmpLogNum1:${tmpLogNum1}" if [[ $tmpLogNum1 -gt $errorLogNum61616 ]] then echo "start-----------------------------------------------------" | tee -a $errorLogFile echo " 日志编号:监控脚本-2001" | tee -a $errorLogFile echo " 时间:$currTime" | tee -a $errorLogFile echo " 主机名称:$local_host($local_ip)" | tee -a $errorLogFile echo " 步骤:0" | tee -a $errorLogFile echo " 类:ActiveMQ-61616" | tee -a $errorLogFile echo " 功能描述:ActiveMQ数据文件levelDB同步异常,文件损坏!" | tee -a $errorLogFile echo " 用户名:SYS" | tee -a $errorLogFile echo " 日志:ActiveMQ数据文件levelDB同步异常,文件损坏!异常日志内容:Could not load message seq 和 No reader available for position" | tee -a $errorLogFile echo " 解决方案:建议在非生产时间,结束3台ActiveMQ进程。" | tee -a $errorLogFile echo " 级别:ERROR" | tee -a $errorLogFile echo " 其他参数:" | tee -a $errorLogFile echo "end----------------------------------------------------" | tee -a $errorLogFile fi errorLogNum61616=$tmpLogNum1; if [[ $pidpath =~ $P2 ]] then echo "$P2 已经存在" else echo "start activemq. $P2 bin/activemq" mv ${P2}data ${P2}data.bak$datetime nohup ${P2}bin/activemq start >/dev/null 2>&1 & fi #检查61626日志 #tmpLogNum2=`cat /mpjava/activemq-cluster2/data/activemq.log |grep -E "Could not load message seq|No reader available for position" |wc -l` tmpLogNum2=`grep -E "Could not load message seq|No reader available for position" ${P2}/data/activemq.log |wc -l` #tmpLogNum2=$? echo "tmpLogNum2:${tmpLogNum2}" if [[ $tmpLogNum2 -gt $errorLogNum61626 ]] then echo "start-----------------------------------------------------" | tee -a $errorLogFile echo " 日志编号:监控脚本-2001" | tee -a $errorLogFile echo " 时间:$currTime" | tee -a $errorLogFile echo " 主机名称:$local_host($local_ip)" | tee -a $errorLogFile echo " 步骤:0" | tee -a $errorLogFile echo " 类:ActiveMQ-61626(${P2})" | tee -a $errorLogFile echo " 功能描述:ActiveMQ数据文件levelDB同步异常,文件损坏!" | tee -a $errorLogFile echo " 用户名:SYS" | tee -a $errorLogFile echo " 日志:ActiveMQ数据文件levelDB同步异常,文件损坏!异常日志内容:Could not load message seq 和 No reader available for position" | tee -a $errorLogFile echo " 解决方案:建议在非生产时间,结束3台ActiveMQ进程。" | tee -a $errorLogFile echo " 级别:ERROR" | tee -a $errorLogFile echo " 其他参数:" | tee -a $errorLogFile echo "end----------------------------------------------------" | tee -a $errorLogFile fi errorLogNum61626=$tmpLogNum2; #每次循环沉睡60s sleep 60 done 3.activemq定时归档脚本进程amqportwatch.sh此脚本必须在三台ActiveMQ以外的服务器(如监控服务器)部署。实现1)端口全部异常扫描。2)定时对activemq进行归档,即在没有未消费消息情况下,对三台ActiveMQ服务器的6个进程进程关停。 手动启动命令:su - root -c '/mpjava/amqportwatch.sh &' 开机自启动命令,在/etc/rc.local,加入su - root -c '/mpjava/amqportwatch.sh &' 脚本内容:
#!/bin/bash #add for chkconfig #chkconfig: 2345 70 30 #description:AmqPortWatch shell #关于脚本的简短描述 #processname:AmqPortWatch #第一个进程名,后边设置自启动的时候会用到 #开机启动/etc/rc.local加入:su - root -c '/mpjava/amqportwatch.sh &' P1=/mpjava/activemq-cluster1/ P2=/mpjava/activemq-cluster2/ #amq集群服务器 remote_hosts="192.168.1.101 192.168.1.102 192.168.1.103" #amq服务器用户 remote_host_user=root #amq服务器密码 remote_host_right_password="CZGC123!!" #归档开关 1-开,开启定时归档;0-关,不归档; data_keep_run=1; #归档周:1-6即周一至六,0是周日 data_keep_week=3; #归档时:00-23,05为凌晨5点,17点为下午5点 data_keep_hour=15; #activemq访问网关地址(Nginx-VIP),默认地址,如:http://172.26.152.173:8161/admin/xml/queues.jsp NginxGatewayServerIP="172.26.152.173" #activemq访问账号密码 data_keep_xml_user="admin"; data_keep_xml_password="admin"; #归档时间未成功归档,离上次成功归档超过1个月(30天),在归档时间执行强制归档。 data_keep_day_count=30; #归档日志目录 logPath="/mpjava/amqportwatch-log/" mkdir -p $logPath #归档日志文件 logfile="" #告警日志输出,监控格式标准start---- end---- errorLogFile="${logPath}activemq-cluster-error-alert.log" #节点MQ归档脚本(已增加远程归档,不是必须部署) amqwatchPathFile="/mpjava/amqwatch.sh" #当前时间 currTime=$(date +"%Y-%m-%d %T") #文件名称 cur_datetime="`date +%Y-%m-%d-%H-%M-%S`"; #归档当前状态 0-未归档;1-归档中; data_keep_state=0; #归档状态,控制归档期间重试 data_keep_61616_state=0; data_keep_61626_state=0; #上次成功归档时间,三个月进行强制归档 data_keep_61616_last_date=$currTime data_keep_61626_last_date=$currTime checkPortReslut=0 checkPortServerIP=$NginxGatewayServerIP check61616ServerIP=$NginxGatewayServerIP check61626ServerIP=$NginxGatewayServerIP local_host=`hostname` local_ip=`/sbin/ifconfig -a|grep inet|grep -v 127.0.0.1|grep -v 192.168.122.1|grep -v inet6|awk '{print $2}'|tr -d "addr:"` function checkAMQPort() { currTime=$(date +"%Y-%m-%d %T") AMQPort=$1; AMQPath=$2 echo "${currTime}检查${AMQPort}端口bengin" | tee -a $logfile checkncatreslut="`rpm -qa |grep ncat`" echo "checkncatreslut:${checkncatreslut}" | tee -a $logfile checksshpassreslut="`rpm -qa sshpass`" echo "checksshpassreslut:${checksshpassreslut}" | tee -a $logfile if [ -n "$checkncatreslut" ] && [ -n "$checksshpassreslut" ];then #check begin checkPortReslut=0 checkPortServerIP=$NginxGatewayServerIP for itemServer in $remote_hosts do echo $itemServer | tee -a $logfile ping -c2 -i0.3 -W1 $itemServer &>/dev/null pingResult=$?; echo "pingResult:${pingResult}" | tee -a $logfile if [ $pingResult -ne 0 ]; then echo "${itemServer},无法ping通!" | tee -a $logfile else echo "${itemServer},ping OK!" | tee -a $logfile ncat -w 1 $itemServer $AMQPort </dev/null ncatResult=$?; echo "ncatResult:${ncatResult}" | tee -a $logfile if [ $ncatResult -ne 1 ]; then echo "ncat -w 1 ${itemServer}:${AMQPort},OK!" | tee -a $logfile checkPortReslut=1; checkPortServerIP=$itemServer; else echo "ncat -w 1 ${itemServer}:${AMQPort},NG!" | tee -a $logfile fi fi done echo "checkPortReslut:${checkPortReslut}" | tee -a $logfile #check end if [ $checkPortReslut -ne 0 ]; then echo "${AMQPort}端口正常!" | tee -a $logfile else echo "${AMQPort}端口全部异常!" | tee -a $logfile echo "start-----------------------------------------------------" | tee -a $errorLogFile echo " 日志编号:监控脚本-2001" | tee -a $errorLogFile echo " 时间:$currTime" | tee -a $errorLogFile echo " 主机名称:$local_host($local_ip)" | tee -a $errorLogFile echo " 步骤:0" | tee -a $errorLogFile echo " 类:ActiveMQ" | tee -a $errorLogFile echo " 功能描述:ActiveMQ${AMQPort}端口全部异常" | tee -a $errorLogFile echo " 用户名:SYS" | tee -a $errorLogFile echo " 日志:ActiveMQ${AMQPort}端口全部异常" | tee -a $errorLogFile echo " 解决方案:结束3台ActiveMQ${AMQPort}进程进行归档。" | tee -a $errorLogFile echo " 级别:ERROR" | tee -a $errorLogFile echo " 其他参数:" | tee -a $errorLogFile echo "end----------------------------------------------------" | tee -a $errorLogFile for itemServer in $remote_hosts do ping -c2 -i0.3 -W1 $itemServer &>/dev/null if [ $? -ne 0 ]; then echo "${itemServer},无法ping通!" | tee -a $logfile else echo "开始结束进程:${itemServer}:${AMQPort}:${AMQPath}" | tee -a $logfile stopresult=$(sshpass -p "${remote_host_right_password}" ssh -o StrictHostKeyChecking=no $remote_host_user@$itemServer "ps -ef |grep "$AMQPath" | grep -v grep| awk '{print \$2}'| xargs kill -9 >/dev/null 2>&1 &") echo "stopresult:${stopresult}" | tee -a $logfile amqwatchCheckResult=$(sshpass -p "${remote_host_right_password}" ssh -o StrictHostKeyChecking=no $remote_host_user@$itemServer "ps -ef |grep amqwatch.sh |grep -v grep |wc -l;") echo "amqwatchCheckResult:${amqwatchCheckResult}" | tee -a $logfile if [ $amqwatchCheckResult -ne 0 ]; then echo "${itemServer}监控脚amqwatch.sh本正常!" | tee -a $logfile else echo "${itemServer}监控脚本amqwatch.sh不正常!" | tee -a $logfile echo "确定脚本文件amqwatch.sh:${itemServer}:${AMQPort}:${AMQPath}" | tee -a $logfile amqwatchFileResult=$(sshpass -p "${remote_host_right_password}" ssh -o StrictHostKeyChecking=no $remote_host_user@$itemServer "[ -e ${amqwatchPathFile} ] && echo 1 || echo 0;") echo "amqwatchFileResult:${amqwatchFileResult}" | tee -a $logfile if [ $amqwatchFileResult = 1 ]; then echo "远程启动amqwatch.sh:${itemServer}:${AMQPort}:${AMQPath}" | tee -a $logfile amqwatchRunResult=$(sshpass -p "${remote_host_right_password}" ssh -o StrictHostKeyChecking=no $remote_host_user@$itemServer "su - root -c '${amqwatchPathFile} &' >/dev/null 2>&1 &") echo "amqwatchRunResult:${amqwatchRunResult}" | tee -a $logfile else echo "远程归档mv:${itemServer}:${AMQPort}:${AMQPath}" | tee -a $logfile amqMVDataResult=$(sshpass -p "${remote_host_right_password}" ssh -o StrictHostKeyChecking=no $remote_host_user@$itemServer "mv ${AMQPath}data ${AMQPath}data.bak${cur_datetime} &") echo "amqMVDataResult${amqMVDataResult}" | tee -a $logfile echo "远程启动./activemq start:${itemServer}:${AMQPort}:${AMQPath}" | tee -a $logfile amqStartResult=$(sshpass -p "${remote_host_right_password}" ssh -o StrictHostKeyChecking=no $remote_host_user@$itemServer "${AMQPath}bin/./activemq start >/dev/null 2>&1 &") echo "amqStartResult:${amqStartResult}" | tee -a $logfile fi fi fi done fi else echo "请安装nact和sshpass!" | tee -a $logfile fi echo "检查${AMQPort}端口end" | tee -a $logfile } while true; do #AMQ定期归档begin cur_date="`date +%Y-%m-%d`"; currTime=$(date +"%Y-%m-%d %T") cur_datetime="`date +%Y-%m-%d-%H-%M-%S`"; #分 cur_date_M="`date +%M`"; #时 cur_date_H="`date +%H`"; #周 cur_date_W="`date +%w`"; #日志输出 echo "日志输出"| tee -a $logfile logfile="${logPath}/${cur_date}.log" #归档开关 1-开,开启定时归档;0-关,不归档; if [ $data_keep_run = 1 ]; then #if [ $cur_date_H = "05" ]; then #凌晨5点执行 if [ $cur_date_W = $data_keep_week ] && [ $cur_date_H = $data_keep_hour ]; then #每周日凌晨5点执行 echo "${currTime}执行定时归档[周${data_keep_week},时${data_keep_hour}]![data_keep_state:${data_keep_state},data_keep_61616_state:${data_keep_61616_state},data_keep_61626_state:${data_keep_61626_state}]" | tee -a $logfile if [ $data_keep_state = 0 ]; then #归档状态进行中 data_keep_state=1; #确认是否有待消费MQ消息begin echo "开始确认是否有待消费(Number Of Pending Messages)MQ消息![data_keep_state:${data_keep_state},data_keep_61616_state:${data_keep_61616_state},data_keep_61626_state:${data_keep_61626_state}]" | tee -a $logfile cur_curl_datetime="`date +%Y-%m-%d-%H-%M-%S`"; #未消费消息检查结果0-下载队列xml文件错误,1-没有未消费队列或者只有死信队列,可执行归档,2-有未消费队列,不执行归档 check61616PendingMessagesResult=0; activemq61616PathXml="${logPath}/activemq61616-${cur_curl_datetime}.xml"; activemq61616PathUrl="http://${check61616ServerIP}:8161/admin/xml/queues.jsp" #curl -u admin:admin -o activemq61616-data.xml "http://172.26.153.110:8161/admin/xml/queues.jsp" curl -u $data_keep_xml_user:$data_keep_xml_password -o $activemq61616PathXml $activemq61616PathUrl #确认下载61616队列xml文件格式正常 curl61616Result=`grep -E '<queues>|<queue name=' $activemq61616PathXml |wc -l` echo "curl61616Result:$curl61616Result" | tee -a $logfile if [ $curl61616Result = 0 ]; then echo "下载61616队列xml文件错误${activemq61616PathUrl},无法确认未消费队列[curl61616Result:$curl61616Result],此次定时归档失败!" | tee -a $logfile echo "start-----------------------------------------------------" | tee -a $errorLogFile echo " 日志编号:监控脚本-2001" | tee -a $errorLogFile echo " 时间:$currTime" | tee -a $errorLogFile echo " 主机名称:$local_host($local_ip)" | tee -a $errorLogFile echo " 步骤:0" | tee -a $errorLogFile echo " 类:ActiveMQ" | tee -a $errorLogFile echo " 功能描述:定时归档[周${data_keep_week},时${data_keep_hour}]" | tee -a $errorLogFile echo " 用户名:SYS" | tee -a $errorLogFile echo " 日志:下载61616队列xml文件错误${activemq61616PathUrl},无法确认未消费队列[curl61616Result:$curl61616Result],此次定时归档失败!" | tee -a $errorLogFile echo " 解决方案:确认ActiveMQ-8161web管理服务正常。" | tee -a $errorLogFile echo " 级别:ERROR" | tee -a $errorLogFile echo " 其他参数:activemq61616PathXml:${activemq61616PathXml},data_keep_xml_user:${data_keep_xml_user},data_keep_xml_password:${data_keep_xml_password}]" | tee -a $errorLogFile echo "end----------------------------------------------------" | tee -a $errorLogFile else echo "下载61616队列xml文件正常${activemq61616PathUrl},[curl61616Result:$curl61616Result]" | tee -a $logfile #统计61616未消费队列 #grep -E --color 'size="[1-9][0-9]*"' activemq61616-data.xml ActiveMQ61616_Pending_Num=`grep -E --color 'size="[1-9][0-9]*"' $activemq61616PathXml |wc -l` echo "统计61616未消费队列[ActiveMQ61616_Pending_Num:$ActiveMQ61616_Pending_Num]" | tee -a $logfile #统计61616死信队列 ActiveMQ61616_DLQ_Num=`grep -C 2 -E --color '<queue name="ActiveMQ.DLQ">' $activemq61616PathXml |grep -E --color 'size="[1-9][0-9]*"' |wc -l` echo "统计61616死信队列[ActiveMQ61616_DLQ_Num:$ActiveMQ61616_DLQ_Num]" | tee -a $logfile if [ $ActiveMQ61616_Pending_Num = 0 ] || [ $ActiveMQ61616_Pending_Num = $ActiveMQ61616_DLQ_Num ]; then #没有未消费队列或者只有死信队列,执行61616归档 check61616PendingMessagesResult=1; else #有未消费队列,无法执行61616归档,输出告警 check61616PendingMessagesResult=2; fi fi echo "完成确认61616是否有待消费(Number Of Pending Messages)MQ消息![data_keep_state:${data_keep_state},data_keep_61616_state:${data_keep_61616_state},data_keep_61626_state:${data_keep_61626_state}][check61616PendingMessagesResult:$check61616PendingMessagesResult]" | tee -a $logfile #未消费消息检查结果0-下载队列xml文件错误,1-没有未消费队列或者只有死信队列,可执行归档,2-有未消费队列,不执行归档 check61626PendingMessagesResult=0; activemq61626PathXml="${logPath}/activemq61626-${cur_curl_datetime}.xml"; activemq61626PathUrl="http://${check61626ServerIP}:8162/admin/xml/queues.jsp" curl -u $data_keep_xml_user:$data_keep_xml_password -o $activemq61626PathXml $activemq61626PathUrl #确认下载61626队列xml文件格式正常 curl61626Result=`grep -E '<queues>|<queue name=' $activemq61626PathXml |wc -l` echo "curl61626Result:$curl61626Result" | tee -a $logfile if [ $curl61626Result = 0 ]; then echo "下载61626队列xml文件错误${$activemq61626PathUrl},无法确认未消费队列[curl61626Result:$curl61626Result],此次定时归档失败!" | tee -a $logfile echo "start-----------------------------------------------------" | tee -a $errorLogFile echo " 日志编号:监控脚本-2001" | tee -a $errorLogFile echo " 时间:$currTime" | tee -a $errorLogFile echo " 主机名称:$local_host($local_ip)" | tee -a $errorLogFile echo " 步骤:0" | tee -a $errorLogFile echo " 类:ActiveMQ" | tee -a $errorLogFile echo " 功能描述:定时归档[周${data_keep_week},时${data_keep_hour}]" | tee -a $errorLogFile echo " 用户名:SYS" | tee -a $errorLogFile echo " 日志:下载61626队列xml文件错误${activemq61626PathUrl},无法确认未消费队列[curl61626Result:$curl61626Result],此次定时归档失败!" | tee -a $errorLogFile echo " 解决方案:确认ActiveMQ-8162web管理服务正常。" | tee -a $errorLogFile echo " 级别:ERROR" | tee -a $errorLogFile echo " 其他参数:activemq61626PathXml:${activemq61626PathXml},data_keep_xml_user:${data_keep_xml_user},data_keep_xml_password:${data_keep_xml_password}]" | tee -a $errorLogFile echo "end----------------------------------------------------" | tee -a $errorLogFile else echo "下载61626队列xml文件正常${activemq61626PathUrl},[curl61626Result:$curl61626Result]" | tee -a $logfile #统计61626未消费队列 #grep -E --color 'size="[1-9][0-9]*"' activemq61626-data.xml ActiveMQ61626_Pending_Num=`grep -E --color 'size="[1-9][0-9]*"' $activemq61626PathXml |wc -l` echo "统计61626未消费队列[ActiveMQ61626_Pending_Num:$ActiveMQ61626_Pending_Num]" | tee -a $logfile #统计61626死信队列 ActiveMQ61626_DLQ_Num=`grep -C 2 -E --color '<queue name="ActiveMQ.DLQ">' $activemq61626PathXml |grep -E --color 'size="[1-9][0-9]*"' |wc -l` echo "统计61626死信队列[ActiveMQ61626_DLQ_Num:$ActiveMQ61626_DLQ_Num]" | tee -a $logfile if [ $ActiveMQ61626_Pending_Num = 0 ] || [ $ActiveMQ61626_Pending_Num = $ActiveMQ61626_DLQ_Num ]; then #没有未消费队列或者只有死信队列,执行61626归档 check61626PendingMessagesResult=1; else #有未消费队列,无法执行61626归档,输出告警 check61626PendingMessagesResult=2; fi fi echo "完成确认61626是否有待消费(Number Of Pending Messages)MQ消息![data_keep_state:${data_keep_state},data_keep_61616_state:${data_keep_61616_state},data_keep_61626_state:${data_keep_61626_state}][check61626PendingMessagesResult:$check61626PendingMessagesResult]" | tee -a $logfile #确认是否有待消费MQ消息end #记录本次执行归档结果 data_keep_61616_state_now=$data_keep_61616_state; data_keep_61626_state_now=$data_keep_61626_state; for itemServer in $remote_hosts do ping -c2 -i0.3 -W1 $itemServer &>/dev/null if [ $? -ne 0 ]; then echo "${itemServer},无法ping通!" | tee -a $logfile else echo "开始执行归档和清理begin" | tee -a $logfile if [ $data_keep_61616_state -ne 1 ]; then echo "开始结束进程:${itemServer}:${P1}" | tee -a $logfile lastDateTime=`date -d "${data_keep_61616_last_date}" +%s`; nowDate=$(date +"%Y-%m-%d %T"); nowDateTime=`date -d "${nowDate}" +%s`; CountSeconds=$(($nowDateTime-$lastDateTime)); CountDays=0; if [ $CountSeconds -gt 86400 ]; then CountDays=`expr $CountSeconds / 86400`; fi echo "61616距离上次归档天数:[CountSeconds:${CountSeconds},CountDays:${CountDays}]" | tee -a $logfile #可执行归档或者离上次成功归档超过1个月(30天) if [ $check61616PendingMessagesResult = 1 ] || [ $CountDays -ge $data_keep_day_count ]; then stopresult=$(sshpass -p "${remote_host_right_password}" ssh -o StrictHostKeyChecking=no $remote_host_user@$itemServer "ps -ef |grep "$P1" | grep -v grep| awk '{print \$2}'| xargs kill -9 >/dev/null 2>&1 &") data_keep_61616_last_date=$(date +"%Y-%m-%d %T") data_keep_61616_state_now=1; echo "${data_keep_61616_last_date}完成结束进程!stopresult:${stopresult}" | tee -a $logfile else data_keep_61616_state_now=0; echo "未执行结束进程,8161有待消费MQ消息或者下载数据异常:${itemServer}:${P1}" | tee -a $logfile fi else echo "61616今天归档成功,不需要重复归档![data_keep_state:${data_keep_state},data_keep_61616_state:${data_keep_61616_state},data_keep_61626_state:${data_keep_61626_state}]" | tee -a $logfile fi if [ $data_keep_61626_state -ne 1 ]; then echo "开始结束进程:${itemServer}:${P2}" | tee -a $logfile lastDateTime=`date -d "${data_keep_61626_last_date}" +%s`; nowDate=$(date +"%Y-%m-%d %T"); nowDateTime=`date -d "${nowDate}" +%s`; CountSeconds=$(($nowDateTime-$lastDateTime)); CountDays=0; if [ $CountSeconds -gt 86400 ]; then CountDays=`expr $CountSeconds / 86400`; fi echo "61626距离上次归档天数:[CountSeconds:${CountSeconds},CountDays:${CountDays}]" | tee -a $logfile #可执行归档或者离上次成功归档超过1个月(30天) if [ $check61626PendingMessagesResult = 1 ] || [ $CountDays -ge $data_keep_day_count ]; then stopresult2=$(sshpass -p "${remote_host_right_password}" ssh -o StrictHostKeyChecking=no $remote_host_user@$itemServer "ps -ef |grep "$P2" | grep -v grep| awk '{print \$2}'| xargs kill -9 >/dev/null 2>&1 &") data_keep_61626_last_date=$(date +"%Y-%m-%d %T"); data_keep_61626_state_now=1; echo "${data_keep_61626_last_date}完成结束进程!stopresult2:${stopresult2}" | tee -a $logfile else data_keep_61626_state_now=0; echo "未执行结束进程,8162有待消费MQ消息或者下载数据异常:${itemServer}:${P2}" | tee -a $logfile fi else echo "61626今天归档成功,不需要重复归档![data_keep_state:${data_keep_state},data_keep_61616_state:${data_keep_61616_state},data_keep_61626_state:${data_keep_61626_state}]" | tee -a $logfile fi if [ $data_keep_61616_state -ne 1 ]; then echo "开始清理:${itemServer}:${P1}" | tee -a $logfile #find /mpjava/activemq-cluster1/ -mtime +7 -type d -name "data.bak*" -exec rm -rf {} \; rmresult=$(sshpass -p "${remote_host_right_password}" ssh -o StrictHostKeyChecking=no $remote_host_user@$itemServer "find "$P1" -mtime +7 -type d -name 'data.bak*' -exec rm -rf {} \;") echo "rmresult:${rmresult}" | tee -a $logfile else echo "61616今天清理成功,不需要再清理![data_keep_state:${data_keep_state},data_keep_61616_state:${data_keep_61616_state},data_keep_61626_state:${data_keep_61626_state}]" | tee -a $logfile fi if [ $data_keep_61626_state -ne 1 ]; then echo "开始清理:${itemServer}:${P2}" | tee -a $logfile #find /mpjava/activemq-cluster2/ -mtime +7 -type d -name "data.bak*" -exec rm -rf {} \; rmresult2=$(sshpass -p "${remote_host_right_password}" ssh -o StrictHostKeyChecking=no $remote_host_user@$itemServer "find "$P2" -mtime +7 -type d -name 'data.bak*' -exec rm -rf {} \;") echo "rmresult2:${rmresult2}" | tee -a $logfile else echo "61626今天清理成功,不需要再清理![data_keep_state:${data_keep_state},data_keep_61616_state:${data_keep_61616_state},data_keep_61626_state:${data_keep_61626_state}]" | tee -a $logfile fi echo "完成执行归档和清理end" | tee -a $logfile fi done #更新本次执行归档结果 data_keep_61616_state=$data_keep_61616_state_now; data_keep_61626_state=$data_keep_61626_state_now; #清理7天前/mpjava/amqportwath/*.log和*.xml文件 echo "清理7天前${logPath}*.log和*.xml文件" | tee -a $logfile find ${logPath} -mtime +7 -type f -name "*.log" -exec rm -rf {} \; find ${logPath} -mtime +7 -type f -name "*.xml" -exec rm -rf {} \; #如果61616或者61626归档失败,10分钟后重试 if [ $data_keep_61616_state -ne 1 ] || [ $data_keep_61626_state -ne 1 ]; then data_keep_state=0; #if [ $data_keep_state = 1 ]; then if [ $data_keep_61616_state -ne 1 ]; then #61616归档失败 echo "61616归档失败!" | tee -a $logfile echo "start-----------------------------------------------------" | tee -a $errorLogFile echo " 日志编号:监控脚本-2001" | tee -a $errorLogFile echo " 时间:$currTime" | tee -a $errorLogFile echo " 主机名称:$local_host($local_ip)" | tee -a $errorLogFile echo " 步骤:0" | tee -a $errorLogFile echo " 类:ActiveMQ" | tee -a $errorLogFile echo " 功能描述:定时归档[周${data_keep_week},时${data_keep_hour}]" | tee -a $errorLogFile echo " 用户名:SYS" | tee -a $errorLogFile echo " 日志:61616归档失败!原因是有待消费MQ消息或者下载数据异常。" | tee -a $errorLogFile echo " 解决方案:非生产时间,手动执行归档(结束三台MQ服务器的6个activemq进程)。" | tee -a $errorLogFile echo " 级别:ERROR" | tee -a $errorLogFile echo " 其他参数:" | tee -a $errorLogFile echo "end----------------------------------------------------" | tee -a $errorLogFile fi if [ $data_keep_61626_state -ne 1 ]; then #61626归档失败 echo "61626归档失败!" | tee -a $logfile echo "start-----------------------------------------------------" | tee -a $errorLogFile echo " 日志编号:监控脚本-2001" | tee -a $errorLogFile echo " 时间:$currTime" | tee -a $errorLogFile echo " 主机名称:$local_host($local_ip)" | tee -a $errorLogFile echo " 步骤:0" | tee -a $errorLogFile echo " 类:ActiveMQ" | tee -a $errorLogFile echo " 功能描述:定时归档[周${data_keep_week},时${data_keep_hour}]" | tee -a $errorLogFile echo " 用户名:SYS" | tee -a $errorLogFile echo " 日志:61626归档失败!原因是有待消费MQ消息或者下载数据异常。" | tee -a $errorLogFile echo " 解决方案:非生产时间,手动执行归档(结束三台MQ服务器的6个activemq进程)。" | tee -a $errorLogFile echo " 级别:ERROR" | tee -a $errorLogFile echo " 其他参数:" | tee -a $errorLogFile echo "end----------------------------------------------------" | tee -a $errorLogFile fi #fi echo "61616或者61626归档失败,10分钟后重试。[data_keep_state:${data_keep_state},data_keep_61616_state:${data_keep_61616_state},data_keep_61626_state:${data_keep_61626_state}]" | tee -a $logfile echo "sleep:10分钟(600s)" | tee -a $logfile sleep 600 fi else echo "${currTime}今天已完成归档![周${data_keep_week},时${data_keep_hour}][data_keep_state:${data_keep_state},data_keep_61616_state:${data_keep_61616_state},data_keep_61626_state:${data_keep_61626_state}]" | tee -a $logfile echo "sleep:10分钟(600s)" | tee -a $logfile sleep 600 fi else echo "${currTime}非归档时间![周${data_keep_week},时${data_keep_hour}][data_keep_state:${data_keep_state},data_keep_61616_state:${data_keep_61616_state},data_keep_61626_state:${data_keep_61626_state}]" | tee -a $logfile #归档状态复原 data_keep_61616_state=0; data_keep_61626_state=0; data_keep_state=0; echo "${currTime}归档状态复原![data_keep_state:${data_keep_state},data_keep_61616_state:${data_keep_61616_state},data_keep_61626_state:${data_keep_61626_state}]" | tee -a $logfile fi fi #AMQ定期归档end if [ $data_keep_state = 0 ]; then #AMQ端口61616/61626检查begin check61616PortReslut=0 check61616ServerIP="" checkAMQPort 61616 $P1 check61616PortReslut=$checkPortReslut check61616ServerIP=$checkPortServerIP echo "check61616PortReslut${check61616PortReslut}" | tee -a $logfile echo "check61616ServerIP${check61616ServerIP}" | tee -a $logfile check61626PortReslut=0 check61626ServerIP="" checkAMQPort 61626 $P2 check61626PortReslut=$checkPortReslut check61626ServerIP=$checkPortServerIP echo "check61626PortReslut:${check61626PortReslut}" | tee -a $logfile echo "check61626ServerIP${check61626ServerIP}" | tee -a $logfile #AMQ端口61616/61626检查end if [ $check61616PortReslut -ne 0 ] && [ $check61626PortReslut -ne 0 ]; then #检查端口正常,每次循环沉睡1分钟(60s) echo "sleep:1分钟(60s)" | tee -a $logfile sleep 60 else #检查端口不正常,结束进程后,20分钟再进行确认 echo "sleep:20分钟(1200s)" | tee -a $logfile sleep 1200 fi fi done注意事项
防火墙配置 防火墙需开放以下端口:
4369 (epmd)5672-5673 (AMQP)15672 (HTTP API)25672 (Erlang分发)TLS加密 建议在生产环境中使用TLS加密通信。可以使用以下命令生成证书:
openssl req -x509 -newkey rsa:4096 -keyout key.pem -out cert.pem -days 365 -nodes定期清理策略 设置队列的自动清理策略:
rabbitmqctl set_policy auto-expire "^log.*" '{"expires":1800000}' --apply-to queues完整部署包及配置文件获取
可以通过以下链接下载RabbitMQ的完整部署包及配置文件:
wget github /rabbitmq/rabbitmq-server/releases/download/v3.12.2/rabbitmq-server-generic-unix-3.12.2.tar.xz通过以上步骤,您可以快速搭建一个高可用的RabbitMQ集群。希望本文对您有所帮助!如果有任何问题,欢迎在评论区留言。
RabbitMQ3.12.2:单节点与集群部署实战指南由讯客互联开源代码栏目发布,感谢您对讯客互联的认可,以及对我们原创作品以及文章的青睐,非常欢迎各位朋友分享到个人网站或者朋友圈,但转载请说明文章出处“RabbitMQ3.12.2:单节点与集群部署实战指南”