简介

RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而群集和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库。

Docker安装

  • docker安装非常方便,强烈建议使用docker
  • 注意设置成自己想要的账号密码
docker run -d --hostname my-rabbit --name some-rabbit -e RABBITMQ_DEFAULT_USER=user -e RABBITMQ_DEFAULT_PASS=password -p 15672:15672 -p 5672:5672 -p 25672:25672 -p 61613:61613 -p 1883:1883 rabbitmq:3-management

访问

  • 访问http://{{ip}}:15672/
  • 输入安装时设置的账号密码即可登陆

simple 模式

截屏20210412 下午6.24.18.png

  • 简单模式:即一个生产者,一个消费者
  • 创建一个maven工程,并添加rabbitMQ依赖
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.10.0</version>
        </dependency>
  • 创建一个Producer类,代码如下
public class Producer {

    public static void main(String[] args) {
        //创建连接工程
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("root");
        connectionFactory.setPassword("123456");
        connectionFactory.setVirtualHost("/");
        Connection connection = null;
        Channel channel = null;
        try {
            //创建连接
            connection = connectionFactory.newConnection("生产者");
            //通过连接获取channel
            channel = connection.createChannel();
            //通过创建交换机,声明队列、绑定关系、路由key、发送消息和接受消息
            String queueName = "queue1";
            //参数:队列名、是否持久化、排他性、是否自动删除、附加参数
            channel.queueDeclare(queueName, false, false, false, null);
            //准备消息内容
            String message = "Hello World!!";
            //发送消息给队列
            channel.basicPublish("", queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
            System.out.println("发送消息成功!!");
        } catch (Exception ex) {
            ex.printStackTrace();
        }finally {
            //关闭通道
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
            //关闭连接
            if (connection != null && connection.isOpen()) {
                try {
                    connection.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
        }
    }

}
  • 注意:VirtualHost需要设置成admin页面下的Virtual Hosts下的name,可以对其进行添加,我这里添加了name为"/"的值;这里的虚拟机节点只是为了做隔离,类似mysql的数据库一样。
  • 调试代码,代码走到创建连接、channel、queue时可以对应管理页面相应的页签查看变化
  • 当代码运行完毕,在管理页面的Overview下可以看到变化:

截屏20210412 下午6.36.36.png

  • 说明此时生产但未消费的消息有1条
  • 创建Consumer类用于消费,代码如下:
public class Consumer {

    public static void main(String[] args) {
        //创建连接工程
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("root");
        connectionFactory.setPassword("123456");
        connectionFactory.setVirtualHost("/");
        Connection connection = null;
        Channel channel = null;
        try {
            //创建连接
            connection = connectionFactory.newConnection("生产者");
            //通过连接获取channel
            channel = connection.createChannel();
            //通过创建交换机,声明队列、绑定关系、路由key、发送消息和接受消息
            channel.basicConsume("queue1", true, new DeliverCallback() {
                @Override
                public void handle(String s, Delivery message) throws IOException {
                    System.out.println("收到消息是" + new String(message.getBody(), "UTF-8"));
                }
            }, new CancelCallback() {
                @Override
                public void handle(String s) throws IOException {
                    System.out.println("接受失败了...");
                }
            });

            System.out.println("开始接受消息");
            System.in.read();

        } catch (Exception ex) {
            ex.printStackTrace();
        }finally {
            //关闭通道
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
            //关闭连接
            if (connection != null && connection.isOpen()) {
                try {
                    connection.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
        }
    }

}
  • 运行Consumer类的main方法,可以看到控制台输出收到消息是Hello World!!,此时再看Overview页面变化,说明消息已成功被消费

截屏20210412 下午6.40.37.png

一些概念

截屏20210415 下午4.07.20.png

  • channel(通道)通过交换机才能将消息传给队列;每个队列都需要绑定一个交换机,如果没有指定交换机就会自动绑定默认交换机
  • 交换机通过路由key将消息发送给不同的队列
  • 管理界面的Features列下面的【D】指的是是否为持久化,如果不是持久化也会存盘,但是当服务器重启后就会被清除

手动发送消息

发送消息

  1. 点击管理页面中Exchanges进入交换机界面
  2. 点击任意一个虚拟机节点下的默认交换机(AMQP default)
  3. 点击Publish message,在路由key中输入一个已存在的队列名例如queue1,如果没有队列就去Queues页面下添加一个
  4. 在Payload中填写消息体,点击Publish Message即可完成消息的发送

接收消息

  1. 点击Queues页面,并选择一个有消息的队列点击进入
  2. 选择Get messages页签,Ack Mode选择Nack,这样消息接收完不会被消费,生产中千万不要选择ack,否则会被消费掉
  3. Messages中输入要获取消息的数量,点击Get messages即可获取消息

未完待写...

Q.E.D.


To see the world as it is and to love it.