# java_消息队列

# 基础知识

# RabbitMQ 、RockerMQ比较

# RabbitMQ

RabbitMQ 2007年发布,是一个在AMQP(高级消息队列协议)基础上完成的,可复用的企业消息系统,是当前最主流的消息中间件之一。

RabbitMQ优点

由于erlang语言的特性,mq 性能较好,高并发;

吞吐量到万级,MQ功能比较完备

健壮、稳定、易用、跨平台、支持多种语言、文档齐全;

开源提供的管理界面非常棒,用起来很好用

社区活跃度高;

RabbitMQ缺点:

erlang开发,很难去看懂源码,基本职能依赖于开源社区的快速维护和修复bug,不利于做二次开发和维护。

RabbitMQ确实吞吐量会低一些,这是因为他做的实现机制比较重。

需要学习比较复杂的接口和协议,学习和维护成本较高。

# RocketMQ

RocketMQ出自 阿里公司的开源产品,用 Java 语言实现,在设计时参考了 Kafka,并做出了自己的一些改进。

RocketMQ在阿里集团被广泛应用在订单,交易,充值,流计算,消息推送,日志流式处理,binglog分发等场景。

RocketMQ优点:

单机吞吐量:十万级

可用性:非常高,分布式架构

消息可靠性:经过参数优化配置,消息可以做到0丢失

功能支持:MQ功能较为完善,还是分布式的,扩展性好

支持10亿级别的消息堆积,不会因为堆积导致性能下降

源码是java,我们可以自己阅读源码,定制自己公司的MQ,可以掌控

RocketMQ缺点:

支持的客户端语言不多,目前是java及c++,其中c++不成熟;

社区活跃度一般

没有在 mq 核心中去实现JMS等接口,有些系统要迁移需要修改大量代码

# 消息中间件的选择

# Kafka

Kafka主要特点是基于Pull的模式来处理消息消费,追求高吞吐量,一开始的目的就是用于日志收集和传输,适合产生大量数据的互联网服务的数据收集业务。

大型公司建议可以选用,如果有日志采集功能,肯定是首选kafka了。

# RocketMQ

天生为金融互联网领域而生,对于可靠性要求很高的场景,尤其是电商里面的订单扣款,以及业务削峰,在大量交易涌入时,后端可能无法及时处理的情况。

RoketMQ在稳定性上可能更值得信赖,这些业务场景在阿里双11已经经历了多次考验,如果你的业务有上述并发场景,建议可以选择RocketMQ。

# RabbitMQ

RabbitMQ : 结合erlang语言本身的并发优势,性能较好,社区活跃度也比较高,但是不利于做二次开发和维护。不过,RabbitMQ的社区十分活跃,可以解决开发过程中遇到的bug。如果你的数据量没有那么大,小公司优先选择功能比较完备的RabbitMQ。

# RabbitMQ 的使用

# 安装

# Docker方式安装

https://hub.docker.com/

sudo docker pull rabbitmq:management

docker run --name rabbitmq-server \
	--net docker-network --ip 172.16.0.9 \
	-p 5671:5671 -p 5672:5672 -p 15671:15671 -p 15672:15672 \
	-p 4369:4369 -p 15961:15961 -p 15962:15962 \
	-v /docker/rabbitmq:/var/lib/rabbitmq \
	--hostname rabbitmqServer \
	-e RABBITMQ_DEFAULT_VHOST=rabbitmqServer \
	-e RABBITMQ_DEFAULT_USER=admin \
	-e RABBITMQ_DEFAULT_PASS=admin \
	--restart=always \
	-d rabbitmq:management
1
2
3
4
5
6
7
8
9
10
11
12
13
  • -d 后台运行容器;
  • --name 指定容器名;
  • -p 指定服务运行的端口(5672:应用访问端口;15672:控制台Web端口号);
  • -v 映射目录或文件;
  • --hostname 主机名(RabbitMQ的一个重要注意事项是它根据所谓的 “节点名称” 存储数据,默认为主机名);
  • -e 指定环境变量;(RABBITMQ_DEFAULT_VHOST:默认虚拟机名;RABBITMQ_DEFAULT_USER:默认的用户名;RABBITMQ_DEFAULT_PASS:默认用户名的密码)

# 消息的使用

@RabbitListener(queues = RabbitConfig.QUEUE_A)
public void handleMessage(Message message,Channel channel) throws  IOException{
    try {
        String json = new String(message.getBody());
        JSONObject jsonObject = JSONObject.fromObject(json);
        log.info("消息了【】handleMessage" +  json);
        int i = 1/0;
        //业务处理。
        /**
        * 防止重复消费,可以根据传过来的唯一ID先判断缓存数据中是否有数据
        * 1、有数据则不消费,直接应答处理
        * 2、缓存没有数据,则进行消费处理数据,处理完后手动应答
        * 3、如果消息 处理异常则,可以存入数据库中,手动处理(可以增加短信和邮件提醒功能)
        */
        //手动应答
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    }catch (Exception e){
        log.error("消费消息失败了【】error:"+ message.getBody());
        log.error("OrderConsumer  handleMessage {} , error:",message,e);
        // 处理消息失败,将消息重新放回队列
        channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,true);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
  • deliveryTag:该消息的index
  • multiple:是否批量.true:将一次性拒绝所有小于deliveryTag的消息
  • requeue:被拒绝的是否重新入队列 注意:如果设置为true ,则会添加在队列的末端
void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;
1