博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
rabbitmq简单使用
阅读量:6074 次
发布时间:2019-06-20

本文共 22369 字,大约阅读时间需要 74 分钟。

hot3.png

rabbitmq提供了6中消息队列模型

  1. 简单模式

  • 连接rabbitmq
public class RabbitmqConnectionUtil {    public static Connection getConnection() throws IOException, TimeoutException {        // 连接工厂        ConnectionFactory connectionFactory = new 	ConnectionFactory();        // 服务器地址        connectionFactory.setHost("localhost");        //服务器端口        connectionFactory.setPort(5672);        // rabbitmq用户名        connectionFactory.setUsername("guest");        // rabbitmq密码        connectionFactory.setPassword("guest");        return connectionFactory.newConnection();    }}
  • 生产者
public class Sender {    public static final String QUEUE_NAME = "test_queue";    public static void main(String[] args) throws IOException, TimeoutException {        // 获取rabbitmq 服务器连接        Connection connection = RabbitmqConnectionUtil.getConnection();        // 创建rabbitmq的队列        Channel channel = connection.createChannel();        channel.queueDeclare(QUEUE_NAME, false, false, false, null);        // 将消息发布到队列        String message = "hello, world";        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());        // 关闭        channel.close();        connection.close();    }}
  • 消费者
public class Receive {    private static final String QUEUE_NAME = "test_queue";    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {        Connection connection = RabbitmqConnectionUtil.getConnection();        // 创建通道        Channel channel = connection.createChannel();        channel.queueDeclare(QUEUE_NAME, false, false, false, null);        // 定义消费者        DefaultConsumer consumer = new DefaultConsumer(channel) {            @Override            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {                String message = new String(body, "UTF-8");                System.out.println(message);            }        };        // 绑定通道和消费者        channel.basicConsume(QUEUE_NAME, false, consumer);    }    private static void oldApi() throws IOException, TimeoutException, InterruptedException {        Connection connection = RabbitmqConnectionUtil.getConnection();        // 从连接中创建通道        Channel channel = connection.createChannel();        channel.queueDeclare(QUEUE_NAME, false, false, false, null);        // 定义消费者        QueueingConsumer consumer = new QueueingConsumer(channel);        // 监听队列        channel.basicConsume(QUEUE_NAME, false, consumer);        while (true){            QueueingConsumer.Delivery delivery = consumer.nextDelivery();            String message = new String(delivery.getBody());            System.out.println("[Rec]:"+ message);        }    }}

简单模型:生产者与消费者是一一对应的,一个生产者只能有一个消费者,如果多个的话,必须创建多个,耦合性强

2. Work Queues
Simple队列是生产者和消费者一一对应的,实际开发中,生产者发送消息是毫不费力的,消费者一般需要处理业务,花费的时间较长,如果使用Simple会造成消息的挤压

  • 生产者
public class Send {    public static final String QUEUE_NAME = "test_queue";    public static void main(String[] args) throws IOException, TimeoutException {        Connection connection = RabbitmqConnectionUtil.getConnection();                // 创建channal        Channel channel = connection.createChannel();        channel.queueDeclare(QUEUE_NAME, false, false, false, null);                //发布信息        for (int i = 0; i < 50; i++) {            String message = "hello "+i;            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());            try {                Thread.sleep(i*100);            } catch (InterruptedException e) {                e.printStackTrace();            }        }        channel.close();        connection.close();    }}
  • 消费者一
public class Rece1 {    public static final String QUEUE_NAME = "test_queue";    public static void main(String[] args) throws IOException, TimeoutException {        Connection connection = RabbitmqConnectionUtil.getConnection();        Channel channel = connection.createChannel();        channel.queueDeclare(QUEUE_NAME, false, false, false, null);        DefaultConsumer consumer = new DefaultConsumer(channel) {            @Override            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {                String message = new String(body, "UTF-8");                System.out.println("[1] reced " + message);                try {                    Thread.sleep(2000);                } catch (InterruptedException e) {                    e.printStackTrace();                } finally {                    System.out.println("done");                }            }        };        channel.basicConsume(QUEUE_NAME, true, consumer);    }}
  • 消费者二 和消费者一代码一样 消费1和消费2处理的消息是一样的,轮训去消费消费队列里面的消息, 轮训分发(Round-Robin)
    消费者1:都是奇数
    消费者2:都是偶数
    不管消费端谁忙谁闲,都不会多个一个消息,任务消息你一个我一个 公平分发:多劳多得
  1. 公平分发, 多劳多得
  • 生产者
public class Send {    public static final String QUEUE_NAME = "test_queue";    public static void main(String[] args) throws IOException, TimeoutException {        Connection connection = RabbitmqConnectionUtil.getConnection();                // 创建channal        Channel channel = connection.createChannel();        channel.queueDeclare(QUEUE_NAME, false, false, false, null);        /*         每个消费者发送确认消息之前,消息队列不再发送下一个消息到消费者,一次只处理一个消息         */        int prefectCount = 1;        channel.basicQos(prefectCount);        //发布信息        for (int i = 0; i < 50; i++) {            String message = "hello "+i;            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());        }        channel.close();        connection.close();     }}
  • 消费者一
public class Rece1 {    public static final String QUEUE_NAME = "test_queue";    public static void main(String[] args) throws IOException, TimeoutException {        Connection connection = RabbitmqConnectionUtil.getConnection();        Channel channel = connection.createChannel();        channel.queueDeclare(QUEUE_NAME, false, false, false, null);        channel.basicQos(1);        DefaultConsumer consumer = new DefaultConsumer(channel) {            @Override            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {                String message = new String(body, "UTF-8");                System.out.println("[1] reced " + message);                try {                    Thread.sleep(2000);                } catch (InterruptedException e) {                    e.printStackTrace();                } finally {                    System.out.println("done");                }                // 完成之后,手动应答                channel.basicAck(envelope.getDeliveryTag(), false);            }        };        //关闭自动应答        boolean ack = false;        channel.basicConsume(QUEUE_NAME, ack, consumer);    }}
  • 消费者二
public class Rece2 {    public static final String QUEUE_NAME = "test_queue";    public static void main(String[] args) throws IOException, TimeoutException {        Connection connection = RabbitmqConnectionUtil.getConnection();        Channel channel = connection.createChannel();        channel.queueDeclare(QUEUE_NAME, false, false, false, null);        channel.basicQos(1);        DefaultConsumer consumer = new DefaultConsumer(channel) {            @Override            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {                String message = new String(body, "UTF-8");                System.out.println("[1] reced " + message);                try {                    Thread.sleep(1000);                } catch (InterruptedException e) {                    e.printStackTrace();                } finally {                    System.out.println("done");                }                // 手动应答                channel.basicAck(envelope.getDeliveryTag(), false);            }        };        //关闭自动应答        boolean ack = false;        channel.basicConsume(QUEUE_NAME, ack, consumer);    }}
  1. 消费应答与消息持久化
  • 消息应答 boolean ack = false;
    channel.basicConsume(QUEUE_NAME, ack, consumer);
    boolean ack = true: 自动应答,一旦rabbitmq将消息分发给消费者,就会从内存中删除,这种情况下,如果杀死正在执行的消费者,就会丢失正在处理的消息

boolean ack = false 手动模式,如果一个消费者挂掉,就会交付给其他消费者,rabbitmq支持消息应答,消费者发送一个消息应答,告诉rabbitmq这个消息我已经处理掉了,你可以删除了,然后rabbitmq就删除内存中的消息

消息应答模式是打开的,false
如果这种方式 rabbitmq挂的话,整个消息都会丢失

  • 消息持久化
boolean durable = false;channel.queueDeclare(QUEUE_NAME, false, false, false, null);durable=true,设置为true,如果在代码运行前QUEUE_NAME队列已经存在,并且不是持久化的,rabbitmq不允许重新定义一个已经存在的队列
  1. 订阅模式
    解读:
  • 一个生产者,多个消费者
  • 每个消费者都有自己的队列
  • 生产者没有直接将消息发送给队列,而是到了交换机exchange
  • 每个队列都绑定到交换机上
  • 生产者发送消息 经过交换机 到达队列就能实现一个消息被多个消费者消费
  • 生产者
public class Send {    public static final String EXCHANGE_NAME = "test_exchange_fanout";    public static void main(String[] args) throws IOException, TimeoutException {        Connection connection = RabbitmqConnectionUtil.getConnection();        Channel channel = connection.createChannel();        // 声明交换机        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");  // 分发        // 发送消息        String message = "hello world";        channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());        channel.close();        connection.close();    }}
  • 消费者一
public class Receive {    public static final String QUEUE_NAME = "test_queue_email";    public static final String EXCHANGE_NAME = "test_exchange_fanout";    public static void main(String[] args) throws IOException, TimeoutException {        Connection connection = RabbitmqConnectionUtil.getConnection();        Channel channel = connection.createChannel();        // 声明队列        channel.queueDeclare(QUEUE_NAME, false, false, false,null);        // 绑定交换机        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");        channel.basicQos(1);        DefaultConsumer consumer = new DefaultConsumer(channel) {            @Override            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {                String message = new String(body, "UTF-8");                System.out.println("[1] reced " + message);                try {                    Thread.sleep(2000);                } catch (InterruptedException e) {                    e.printStackTrace();                } finally {                    System.out.println("done");                }                // 手动应答                channel.basicAck(envelope.getDeliveryTag(), false);            }        };        //关闭自动应答        boolean ack = false;        channel.basicConsume(QUEUE_NAME, ack, consumer);    }}
  • 消费者二
public class Receive2 {    public static final String QUEUE_NAME = "test_queue_sms";    public static final String EXCHANGE_NAME = "test_exchange_fanout";    public static void main(String[] args) throws IOException, TimeoutException {        Connection connection = RabbitmqConnectionUtil.getConnection();        Channel channel = connection.createChannel();        // 声明队列        channel.queueDeclare(QUEUE_NAME, false, false, false,null);        // 绑定交换机        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");        channel.basicQos(1);        DefaultConsumer consumer = new DefaultConsumer(channel) {            @Override            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {                String message = new String(body, "UTF-8");                System.out.println("[2] reced " + message);                try {                    Thread.sleep(2000);                } catch (InterruptedException e) {                    e.printStackTrace();                } finally {                    System.out.println("done");                }                // 手动应答                channel.basicAck(envelope.getDeliveryTag(), false);            }        };        //关闭自动应答        boolean ack = false;        channel.basicConsume(QUEUE_NAME, ack, consumer);    }}
  1. Direct 模式
  • 生产者
public class Send {    private static final String EXCHANGE_NAME = "test_exchange_direct";    public static void main(String[] args) throws IOException, TimeoutException {        Connection connection = RabbitmqConnectionUtil.getConnection();        Channel channel = connection.createChannel();        // 声明exchange        channel.exchangeDeclare(EXCHANGE_NAME, "direct");        channel.basicQos(1);        String message = "hello direct";        // 发送消息        String routingKey = "warning";        channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());        System.out.println("send " +  message);        channel.close();        connection.close();    }}
  • 消费者一
public class Recv1 {    private static final String EXCHANGE_NAME = "test_exchange_direct";    private static final String QUEUE_NAME = "test_queue_direct_1";    public static void main(String[] args) throws IOException, TimeoutException {        Connection connection = RabbitmqConnectionUtil.getConnection();        Channel channel = connection.createChannel();        channel.basicQos(1);        channel.queueDeclare(QUEUE_NAME, false, false, false, null);        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {            @Override            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {                String message = new String(body, "UTF-8");                System.out.println("[1] Recv "+ message);                try {                    Thread.sleep(2000);                } catch (InterruptedException e) {                    e.printStackTrace();                }finally {                    // 手动应答                    System.out.println("done");                    channel.basicAck(envelope.getDeliveryTag(), false);                }            }        };        // 关闭自动应答        boolean ack = false;        channel.basicConsume(QUEUE_NAME, ack, defaultConsumer);    }}
  • 消费者二
public class Recv2 {    private static final String EXCHANGE_NAME = "test_exchange_direct";    private static final String QUEUE_NAME = "test_queue_direct_2";    public static void main(String[] args) throws IOException, TimeoutException {        Connection connection = RabbitmqConnectionUtil.getConnection();        Channel channel = connection.createChannel();        channel.basicQos(1);        channel.queueDeclare(QUEUE_NAME, false, false, false, null);        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "info");        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "warning");        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {            @Override            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {                String message = new String(body, "UTF-8");                System.out.println("[1] Recv "+ message);                try {                    Thread.sleep(1000);                } catch (InterruptedException e) {                    e.printStackTrace();                }finally {                    // 手动应答                    System.out.println("done");                    channel.basicAck(envelope.getDeliveryTag(), false);                }            }        };        // 关闭自动应答        boolean ack = false;        channel.basicConsume(QUEUE_NAME, ack, defaultConsumer);    }}
  1. Topic 主题模式

  • 生产者
public class Send {    private static final String EXCHANGE_NAME = "test_exchange_topic";    public static void main(String[] args) throws IOException, TimeoutException {        Connection connection = RabbitmqConnectionUtil.getConnection();        Channel channel = connection.createChannel();        channel.exchangeDeclare(EXCHANGE_NAME, "topic");        String routingKey = "goods.update";        String message = "商品。。。。";        channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());        channel.close();        connection.close();    }}
  • 消费者一
public class Recv1 {    private static final String EXCHANGE_NAME = "test_exchange_topic";    private static final String QUEUE_NAME = "test_queue_topic_2";    public static void main(String[] args) throws IOException, TimeoutException {        Connection connection = RabbitmqConnectionUtil.getConnection();        Channel channel = connection.createChannel();        channel.basicQos(1);        channel.queueDeclare(QUEUE_NAME, false, false, false, null);        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "goods.#");        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {            @Override            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {                String message = new String(body, "UTF-8");                System.out.println("[2] Recv "+ message);                try {                    Thread.sleep(1000);                } catch (InterruptedException e) {                    e.printStackTrace();                }finally {                    // 手动应答                    System.out.println("done");                    channel.basicAck(envelope.getDeliveryTag(), false);                }            }        };        // 关闭自动应答        boolean ack = false;        channel.basicConsume(QUEUE_NAME, ack, defaultConsumer);    }}
  • 消费者二
public class Recv2 {    private static final String EXCHANGE_NAME = "test_exchange_topic";    private static final String QUEUE_NAME = "test_queue_topic_1";    public static void main(String[] args) throws IOException, TimeoutException {        Connection connection = RabbitmqConnectionUtil.getConnection();        Channel channel = connection.createChannel();        channel.basicQos(1);        channel.queueDeclare(QUEUE_NAME, false, false, false, null);        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "goods.add");        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {            @Override            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {                String message = new String(body, "UTF-8");                System.out.println("[1] Recv "+ message);                try {                    Thread.sleep(2000);                } catch (InterruptedException e) {                    e.printStackTrace();                }finally {                    // 手动应答                    System.out.println("done");                    channel.basicAck(envelope.getDeliveryTag(), false);                }            }        };        // 关闭自动应答        boolean ack = false;        channel.basicConsume(QUEUE_NAME, ack, defaultConsumer);    }}
  1. 消息确认机制
    生产者将消息发送出去之后,消息到底有没有到达rabbitmq服务器 默认情况不知道。有两种方式
  • AMQP协议实现了事务机制
  • Confirm模式
    事务机制
    txSelect、txCommit、txRollback
    txSelect:用户将当前channel设置transation模式
    txCommit:用于提交事务
    txRollback:回滚事务
  • 生产者
public class TxSend {    private static final String QUEUE_NAME = "test_queue_tx";    public static void main(String[] args) throws IOException, TimeoutException {        Connection connection = RabbitmqConnectionUtil.getConnection();        Channel channel = connection.createChannel();        channel.queueDeclare(QUEUE_NAME, false, false, false, null);        // 发送数据        String message  = "send  tx message";        try{            channel.txSelect();            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());            int tt = 1/0;            channel.txCommit();        }catch (Exception e){            channel.txRollback();            System.out.println("发送消息失败");        }        channel.close();        connection.close();    }}
  • 消费者
public class TxRecv {    private static final String QUEUE_NAME = "test_queue_tx";    public static void main(String[] args) throws IOException, TimeoutException {        Connection connection = RabbitmqConnectionUtil.getConnection();        Channel channel = connection.createChannel();        channel.queueDeclare(QUEUE_NAME, false, false, false, null);        channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel){            @Override            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {                String message = new String(body);                System.out.println("[recv] message: " + message);            }        });    }}
  1. Confirm模式
    实现原理 生产者将信道设置为Confirm模式,一旦进入confirm模式,所有在该信道上面发送的消息都被指派一个唯一的id,一旦消息被投递到所有匹配的队列之后,broker就会发送一个确认给生产者,使得生产者知道消息被发送到目的队列了,如果消息和队列是可持久的,那么确认消息会将消息写入磁盘之后发出,broker回传给生产者的确认消息中 driver-tag域包含了确认消息的序列号,此外broker也可以设置basic.ack的multiple域,表示到这个序号之前所有的消息都已经得到了处理。存在两种方式:
    (1)、普通 发送一条
    (2)、批量发送
    (3)、异步发送confirm模式:提供一个回调方法
    普通模式
public class Send {    private static final String QUEUE_NAME = "test_queue_confirm_1";    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {        Connection connection = RabbitmqConnectionUtil.getConnection();        Channel channel = connection.createChannel();        channel.queueDeclare(QUEUE_NAME, false, false, false, null);        // 开启confirm模式,一个队列只能有一种模式        channel.confirmSelect();        String message = "send confirm...";        channel.basicPublish("", QUEUE_NAME, null , message.getBytes());        if(!channel.waitForConfirms()){            System.out.println("confirm send failed");        }else{            System.out.println("confirm send ok ");        }        // 关闭信道和连接        channel.close();        connection.close();    }

批量模式

public class Send2 {    private static final String QUEUE_NAME = "test_queue_confirm_1";    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {        Connection connection = RabbitmqConnectionUtil.getConnection();        Channel channel = connection.createChannel();        channel.queueDeclare(QUEUE_NAME, false, false, false, null);        // 开启confirm模式,一个队列只能有一种模式        channel.confirmSelect();        String message = "send confirm...";        for (int i = 0; i < 10; i++) {            channel.basicPublish("", QUEUE_NAME, null , message.getBytes());        }        if(!channel.waitForConfirms()){            System.out.println("confirm send failed");        }else{            System.out.println("confirm send ok ");        }        // 关闭信道和连接        channel.close();        connection.close();    }}

转载于:https://my.oschina.net/u/1433803/blog/3009062

你可能感兴趣的文章
Linux的文件权限
查看>>
通过设置Referer反"反盗链"
查看>>
Swift中的协议
查看>>
搬进Github
查看>>
oracle子查询
查看>>
cocos2d-x-2.2.5项目创建--命令行创建
查看>>
Risk(最短路)
查看>>
ngRoute 和 ui.router 的使用方法和区别
查看>>
IndiaHacks 2016 - Online Edition (Div. 1 + Div. 2) D. Delivery Bears 二分+网络流
查看>>
struts2中页面访问action的url问题,或许很简单
查看>>
EF Code First 学习笔记:关系
查看>>
认识Java Core和Heap Dump
查看>>
【转】所需即所获:像 IDE 一样使用 vim
查看>>
[navicat] Navicat for Oracle Cannot load OCI DLL
查看>>
红米NOTE应用闪退(包括系统设置等各种界面)问题解决经历
查看>>
五种方式让你在java中读取properties文件内容不再是难题
查看>>
php之aop实践
查看>>
CentOS7安装配置redis-3.0.0
查看>>
python安装libxml2和pyquery
查看>>
Oracle 权限(grant、revoke)
查看>>