rabbitmq提供了6中消息队列模型
- 简单模式
- 连接rabbitmq
public class RabbitmqConnectionUtil { public static Connection getConnection() throws IOException, TimeoutException { // 连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); // 服务器地址 connectionFactory.setHost("localhost"); //服务器端口 connectionFactory.setPort(5672); // rabbitmq用户名 connectionFactory.setUsername("guest"); // rabbitmq密码 connectionFactory.setPassword("guest"); return connectionFactory.newConnection(); }}
- 生产者
public class Sender { public static final String QUEUE_NAME = "test_queue"; public static void main(String[] args) throws IOException, TimeoutException { // 获取rabbitmq 服务器连接 Connection connection = RabbitmqConnectionUtil.getConnection(); // 创建rabbitmq的队列 Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 将消息发布到队列 String message = "hello, world"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); // 关闭 channel.close(); connection.close(); }}
- 消费者
public class Receive { private static final String QUEUE_NAME = "test_queue"; public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { Connection connection = RabbitmqConnectionUtil.getConnection(); // 创建通道 Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 定义消费者 DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(message); } }; // 绑定通道和消费者 channel.basicConsume(QUEUE_NAME, false, consumer); } private static void oldApi() throws IOException, TimeoutException, InterruptedException { Connection connection = RabbitmqConnectionUtil.getConnection(); // 从连接中创建通道 Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 定义消费者 QueueingConsumer consumer = new QueueingConsumer(channel); // 监听队列 channel.basicConsume(QUEUE_NAME, false, consumer); while (true){ QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println("[Rec]:"+ message); } }}
简单模型:生产者与消费者是一一对应的,一个生产者只能有一个消费者,如果多个的话,必须创建多个,耦合性强
2. Work Queues Simple队列是生产者和消费者一一对应的,实际开发中,生产者发送消息是毫不费力的,消费者一般需要处理业务,花费的时间较长,如果使用Simple会造成消息的挤压- 生产者
public class Send { public static final String QUEUE_NAME = "test_queue"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = RabbitmqConnectionUtil.getConnection(); // 创建channal Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); //发布信息 for (int i = 0; i < 50; i++) { String message = "hello "+i; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); try { Thread.sleep(i*100); } catch (InterruptedException e) { e.printStackTrace(); } } channel.close(); connection.close(); }}
- 消费者一
public class Rece1 { public static final String QUEUE_NAME = "test_queue"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = RabbitmqConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("[1] reced " + message); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } finally { System.out.println("done"); } } }; channel.basicConsume(QUEUE_NAME, true, consumer); }}
- 消费者二 和消费者一代码一样 消费1和消费2处理的消息是一样的,轮训去消费消费队列里面的消息, 轮训分发(Round-Robin) 消费者1:都是奇数 消费者2:都是偶数 不管消费端谁忙谁闲,都不会多个一个消息,任务消息你一个我一个 公平分发:多劳多得
- 公平分发, 多劳多得
- 生产者
public class Send { public static final String QUEUE_NAME = "test_queue"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = RabbitmqConnectionUtil.getConnection(); // 创建channal Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); /* 每个消费者发送确认消息之前,消息队列不再发送下一个消息到消费者,一次只处理一个消息 */ int prefectCount = 1; channel.basicQos(prefectCount); //发布信息 for (int i = 0; i < 50; i++) { String message = "hello "+i; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); } channel.close(); connection.close(); }}
- 消费者一
public class Rece1 { public static final String QUEUE_NAME = "test_queue"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = RabbitmqConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.basicQos(1); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("[1] reced " + message); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } finally { System.out.println("done"); } // 完成之后,手动应答 channel.basicAck(envelope.getDeliveryTag(), false); } }; //关闭自动应答 boolean ack = false; channel.basicConsume(QUEUE_NAME, ack, consumer); }}
- 消费者二
public class Rece2 { public static final String QUEUE_NAME = "test_queue"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = RabbitmqConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.basicQos(1); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("[1] reced " + message); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } finally { System.out.println("done"); } // 手动应答 channel.basicAck(envelope.getDeliveryTag(), false); } }; //关闭自动应答 boolean ack = false; channel.basicConsume(QUEUE_NAME, ack, consumer); }}
- 消费应答与消息持久化
- 消息应答 boolean ack = false; channel.basicConsume(QUEUE_NAME, ack, consumer); boolean ack = true: 自动应答,一旦rabbitmq将消息分发给消费者,就会从内存中删除,这种情况下,如果杀死正在执行的消费者,就会丢失正在处理的消息
boolean ack = false 手动模式,如果一个消费者挂掉,就会交付给其他消费者,rabbitmq支持消息应答,消费者发送一个消息应答,告诉rabbitmq这个消息我已经处理掉了,你可以删除了,然后rabbitmq就删除内存中的消息
消息应答模式是打开的,false 如果这种方式 rabbitmq挂的话,整个消息都会丢失- 消息持久化
boolean durable = false;channel.queueDeclare(QUEUE_NAME, false, false, false, null);durable=true,设置为true,如果在代码运行前QUEUE_NAME队列已经存在,并且不是持久化的,rabbitmq不允许重新定义一个已经存在的队列
- 订阅模式 解读:
- 一个生产者,多个消费者
- 每个消费者都有自己的队列
- 生产者没有直接将消息发送给队列,而是到了交换机exchange
- 每个队列都绑定到交换机上
- 生产者发送消息 经过交换机 到达队列就能实现一个消息被多个消费者消费
- 生产者
public class Send { public static final String EXCHANGE_NAME = "test_exchange_fanout"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = RabbitmqConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 声明交换机 channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); // 分发 // 发送消息 String message = "hello world"; channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes()); channel.close(); connection.close(); }}
- 消费者一
public class Receive { public static final String QUEUE_NAME = "test_queue_email"; public static final String EXCHANGE_NAME = "test_exchange_fanout"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = RabbitmqConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false,null); // 绑定交换机 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); channel.basicQos(1); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("[1] reced " + message); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } finally { System.out.println("done"); } // 手动应答 channel.basicAck(envelope.getDeliveryTag(), false); } }; //关闭自动应答 boolean ack = false; channel.basicConsume(QUEUE_NAME, ack, consumer); }}
- 消费者二
public class Receive2 { public static final String QUEUE_NAME = "test_queue_sms"; public static final String EXCHANGE_NAME = "test_exchange_fanout"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = RabbitmqConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false,null); // 绑定交换机 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); channel.basicQos(1); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("[2] reced " + message); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } finally { System.out.println("done"); } // 手动应答 channel.basicAck(envelope.getDeliveryTag(), false); } }; //关闭自动应答 boolean ack = false; channel.basicConsume(QUEUE_NAME, ack, consumer); }}
- Direct 模式
- 生产者
public class Send { private static final String EXCHANGE_NAME = "test_exchange_direct"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = RabbitmqConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 声明exchange channel.exchangeDeclare(EXCHANGE_NAME, "direct"); channel.basicQos(1); String message = "hello direct"; // 发送消息 String routingKey = "warning"; channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes()); System.out.println("send " + message); channel.close(); connection.close(); }}
- 消费者一
public class Recv1 { private static final String EXCHANGE_NAME = "test_exchange_direct"; private static final String QUEUE_NAME = "test_queue_direct_1"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = RabbitmqConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.basicQos(1); channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error"); DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("[1] Recv "+ message); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); }finally { // 手动应答 System.out.println("done"); channel.basicAck(envelope.getDeliveryTag(), false); } } }; // 关闭自动应答 boolean ack = false; channel.basicConsume(QUEUE_NAME, ack, defaultConsumer); }}
- 消费者二
public class Recv2 { private static final String EXCHANGE_NAME = "test_exchange_direct"; private static final String QUEUE_NAME = "test_queue_direct_2"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = RabbitmqConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.basicQos(1); channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error"); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "info"); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "warning"); DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("[1] Recv "+ message); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); }finally { // 手动应答 System.out.println("done"); channel.basicAck(envelope.getDeliveryTag(), false); } } }; // 关闭自动应答 boolean ack = false; channel.basicConsume(QUEUE_NAME, ack, defaultConsumer); }}
- Topic 主题模式
- 生产者
public class Send { private static final String EXCHANGE_NAME = "test_exchange_topic"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = RabbitmqConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "topic"); String routingKey = "goods.update"; String message = "商品。。。。"; channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes()); channel.close(); connection.close(); }}
- 消费者一
public class Recv1 { private static final String EXCHANGE_NAME = "test_exchange_topic"; private static final String QUEUE_NAME = "test_queue_topic_2"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = RabbitmqConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.basicQos(1); channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "goods.#"); DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("[2] Recv "+ message); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); }finally { // 手动应答 System.out.println("done"); channel.basicAck(envelope.getDeliveryTag(), false); } } }; // 关闭自动应答 boolean ack = false; channel.basicConsume(QUEUE_NAME, ack, defaultConsumer); }}
- 消费者二
public class Recv2 { private static final String EXCHANGE_NAME = "test_exchange_topic"; private static final String QUEUE_NAME = "test_queue_topic_1"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = RabbitmqConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.basicQos(1); channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "goods.add"); DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("[1] Recv "+ message); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); }finally { // 手动应答 System.out.println("done"); channel.basicAck(envelope.getDeliveryTag(), false); } } }; // 关闭自动应答 boolean ack = false; channel.basicConsume(QUEUE_NAME, ack, defaultConsumer); }}
- 消息确认机制 生产者将消息发送出去之后,消息到底有没有到达rabbitmq服务器 默认情况不知道。有两种方式
- AMQP协议实现了事务机制
- Confirm模式 事务机制 txSelect、txCommit、txRollback txSelect:用户将当前channel设置transation模式 txCommit:用于提交事务 txRollback:回滚事务
- 生产者
public class TxSend { private static final String QUEUE_NAME = "test_queue_tx"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = RabbitmqConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 发送数据 String message = "send tx message"; try{ channel.txSelect(); channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); int tt = 1/0; channel.txCommit(); }catch (Exception e){ channel.txRollback(); System.out.println("发送消息失败"); } channel.close(); connection.close(); }}
- 消费者
public class TxRecv { private static final String QUEUE_NAME = "test_queue_tx"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = RabbitmqConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body); System.out.println("[recv] message: " + message); } }); }}
- Confirm模式 实现原理 生产者将信道设置为Confirm模式,一旦进入confirm模式,所有在该信道上面发送的消息都被指派一个唯一的id,一旦消息被投递到所有匹配的队列之后,broker就会发送一个确认给生产者,使得生产者知道消息被发送到目的队列了,如果消息和队列是可持久的,那么确认消息会将消息写入磁盘之后发出,broker回传给生产者的确认消息中 driver-tag域包含了确认消息的序列号,此外broker也可以设置basic.ack的multiple域,表示到这个序号之前所有的消息都已经得到了处理。存在两种方式: (1)、普通 发送一条 (2)、批量发送 (3)、异步发送confirm模式:提供一个回调方法普通模式
public class Send { private static final String QUEUE_NAME = "test_queue_confirm_1"; public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { Connection connection = RabbitmqConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 开启confirm模式,一个队列只能有一种模式 channel.confirmSelect(); String message = "send confirm..."; channel.basicPublish("", QUEUE_NAME, null , message.getBytes()); if(!channel.waitForConfirms()){ System.out.println("confirm send failed"); }else{ System.out.println("confirm send ok "); } // 关闭信道和连接 channel.close(); connection.close(); }
批量模式
public class Send2 { private static final String QUEUE_NAME = "test_queue_confirm_1"; public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { Connection connection = RabbitmqConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 开启confirm模式,一个队列只能有一种模式 channel.confirmSelect(); String message = "send confirm..."; for (int i = 0; i < 10; i++) { channel.basicPublish("", QUEUE_NAME, null , message.getBytes()); } if(!channel.waitForConfirms()){ System.out.println("confirm send failed"); }else{ System.out.println("confirm send ok "); } // 关闭信道和连接 channel.close(); connection.close(); }}