[RabbitMQ] Work Queue
上一篇分享了RabbitMQ的基礎知識, 包含一些名詞解釋、message從生產者到消費者的流程、生產者與消費者的java程式. 接下來要講的是Work Queue模型, Work Queue, 顧名思義就是生產者產生訊息接著由眾多消費者消費的模型.
我們先將上一篇的程式稍微改一下
讓3個消費者先執行, 接著生產者產生9筆message.
把生產者內的這行註解掉, 以免資訊太多太亂.
接著執行看結果
可以得知woek queue 有下列幾種特性 (round-robin)
當然不, 消費者需要花費時間處理消息, 萬一在處理過程中機器當機那該怎麼辦? 消息豈不就消失了??
是的, 所以RabbitMQ提供了一種確認機制, 假若消費者已將消息處理完則通知RabbitMQ, 同時RabbitMQ才能將該消息從Queue內刪除, 設定方式很簡單, 只需修改下列幾個地方.
消費者端程式改為手動確認 (第二個參數為true代表採用自動確認)
然後我們將消費者內的程式改為訊息為0~7就將其確認, 訊息為8就reject
我們先執行生產者, 讓queue內涵9筆消息
管理網頁顯示已經有9筆消息已準備好被消費
接著我們執行消費者的程式
執行結果可以發現8這條消息被reject了且被重新加入queue內 (第二個參數表示重新加入queue)

從管理網頁可以看到有一筆訊息未確認

因為我們將requeue參數設定為true, 所以訊息又被重新加入queue內

下一篇消息的持久化
2018/01/14 VictorHsiao
我們先將上一篇的程式稍微改一下
public static void main(String[] args) throws Exception {
String host = "192.168.1.103";
Thread consumerThread1 = new Thread(new Consumer(host, "consumerThread1"));
Thread consumerThread2 = new Thread(new Consumer(host, "consumerThread2"));
Thread consumerThread3 = new Thread(new Consumer(host, "consumerThread3"));
consumerThread1.start();
consumerThread2.start();
consumerThread3.start();
Thread.sleep(3000);
Thread producerThread = new Thread(new Producer(host));
producerThread.start();
System.out.println("Finish");
}
讓3個消費者先執行, 接著生產者產生9筆message.
把生產者內的這行註解掉, 以免資訊太多太亂.
System.out.println("Producer Sent " + i);
接著執行看結果
可以得知woek queue 有下列幾種特性 (round-robin)
- 一個訊息只會被一個消費者接收
- 訊息是平均分配給消費者的
- 消費者處理完訊息後才會接收下個訊
當然不, 消費者需要花費時間處理消息, 萬一在處理過程中機器當機那該怎麼辦? 消息豈不就消失了??
是的, 所以RabbitMQ提供了一種確認機制, 假若消費者已將消息處理完則通知RabbitMQ, 同時RabbitMQ才能將該消息從Queue內刪除, 設定方式很簡單, 只需修改下列幾個地方.
消費者端程式改為手動確認 (第二個參數為true代表採用自動確認)
channel.basicConsume(QUEUE_NAME, false, consumer);
然後我們將消費者內的程式改為訊息為0~7就將其確認, 訊息為8就reject
public void run() {
try {
channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(name + " receive message: " + message + " Thread ID: " + Thread.currentThread().getId());
try {
Thread.currentThread().sleep(1000); //模擬做很繁重的工作
if(!"8".equals(message)) {
channel.basicAck(envelope.getDeliveryTag(), false);
}else {
channel.basicReject(envelope.getDeliveryTag(), true);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
channel.basicConsume(QUEUE_NAME, false, consumer);
synchronized (obj) {
obj.wait();
}
} catch (Exception e) {
e.printStackTrace();
}finally {
try {
if(channel != null) channel.close();
if(connection != null) connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
我們先執行生產者, 讓queue內涵9筆消息
public static void main(String[] args) throws Exception {
String host = "192.168.1.103";
//Thread consumerThread1 = new Thread(new Consumer(host, "consumerThread1"));
Thread producerThread = new Thread(new Producer(host));
//consumerThread1.start();
producerThread.start();
System.out.println("GG");
}
管理網頁顯示已經有9筆消息已準備好被消費
接著我們執行消費者的程式
public static void main(String[] args) throws Exception {
String host = "192.168.1.103";
Thread consumerThread1 = new Thread(new Consumer(host, "consumerThread1"));
//Thread producerThread = new Thread(new Producer(host));
consumerThread1.start();
//producerThread.start();
System.out.println("GG");
}
執行結果可以發現8這條消息被reject了且被重新加入queue內 (第二個參數表示重新加入queue)
channel.basicReject(envelope.getDeliveryTag(), true);
從管理網頁可以看到有一筆訊息未確認
因為我們將requeue參數設定為true, 所以訊息又被重新加入queue內
下一篇消息的持久化
2018/01/14 VictorHsiao
留言
張貼留言