[RabbitMQ] Work Queue

上一篇分享了RabbitMQ的基礎知識, 包含一些名詞解釋、message從生產者到消費者的流程、生產者與消費者的java程式. 接下來要講的是Work Queue模型, Work Queue, 顧名思義就是生產者產生訊息接著由眾多消費者消費的模型.


我們先將上一篇的程式稍微改一下

 public static void main(String[] args) throws Exception {  
           String host = "192.168.1.103";  
           Thread consumerThread1 = new Thread(new Consumer(host, "consumerThread1"));  
           Thread consumerThread2 = new Thread(new Consumer(host, "consumerThread2"));  
           Thread consumerThread3 = new Thread(new Consumer(host, "consumerThread3"));  
           consumerThread1.start();  
           consumerThread2.start();  
           consumerThread3.start();  
           Thread.sleep(3000);  
           Thread producerThread = new Thread(new Producer(host));  
           producerThread.start();  
           System.out.println("Finish");  
      }  

讓3個消費者先執行, 接著生產者產生9筆message.
把生產者內的這行註解掉, 以免資訊太多太亂.
 System.out.println("Producer Sent " + i);  

接著執行看結果



可以得知woek queue 有下列幾種特性 (round-robin)

  1. 一個訊息只會被一個消費者接收
  2. 訊息是平均分配給消費者的
  3. 消費者處理完訊息後才會接收下個訊
萬無一失了嗎???
當然不, 消費者需要花費時間處理消息, 萬一在處理過程中機器當機那該怎麼辦? 消息豈不就消失了??
是的, 所以RabbitMQ提供了一種確認機制, 假若消費者已將消息處理完則通知RabbitMQ, 同時RabbitMQ才能將該消息從Queue內刪除, 設定方式很簡單, 只需修改下列幾個地方.

消費者端程式改為手動確認 (第二個參數為true代表採用自動確認)
 channel.basicConsume(QUEUE_NAME, false, consumer);  

然後我們將消費者內的程式改為訊息為0~7就將其確認, 訊息為8就reject
 public void run() {  
                try {  
                     channel = connection.createChannel();  
                     channel.queueDeclare(QUEUE_NAME, false, false, false, null);  
                     com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {  
                     @Override  
                     public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,  
                               byte[] body) throws IOException {  
                          String message = new String(body, "UTF-8");  
                          System.out.println(name + " receive message: " + message + " Thread ID: " + Thread.currentThread().getId());  
                          try {  
                               Thread.currentThread().sleep(1000); //模擬做很繁重的工作  
                               if(!"8".equals(message)) {  
                                    channel.basicAck(envelope.getDeliveryTag(), false);  
                               }else {  
                                    channel.basicReject(envelope.getDeliveryTag(), true);  
                               }  
                          } catch (InterruptedException e) {  
                               e.printStackTrace();  
                          }  
                     }  
                };  
                channel.basicConsume(QUEUE_NAME, false, consumer);  
                synchronized (obj) {  
                     obj.wait();  
                }  
                } catch (Exception e) {  
                     e.printStackTrace();  
                }finally {  
                          try {  
                               if(channel != null) channel.close();  
                               if(connection != null) connection.close();  
                          } catch (Exception e) {  
                               e.printStackTrace();  
                          }  
                }  
           }  

我們先執行生產者, 讓queue內涵9筆消息
 public static void main(String[] args) throws Exception {  
           String host = "192.168.1.103";  
           //Thread consumerThread1 = new Thread(new Consumer(host, "consumerThread1"));  
           Thread producerThread = new Thread(new Producer(host));  
           //consumerThread1.start();  
           producerThread.start();  
           System.out.println("GG");  
      }  

管理網頁顯示已經有9筆消息已準備好被消費


接著我們執行消費者的程式
 public static void main(String[] args) throws Exception {  
           String host = "192.168.1.103";  
           Thread consumerThread1 = new Thread(new Consumer(host, "consumerThread1"));  
           //Thread producerThread = new Thread(new Producer(host));  
           consumerThread1.start();  
           //producerThread.start();  
           System.out.println("GG");  
      }  

執行結果可以發現8這條消息被reject了且被重新加入queue內 (第二個參數表示重新加入queue)
 channel.basicReject(envelope.getDeliveryTag(), true);  



從管理網頁可以看到有一筆訊息未確認


因為我們將requeue參數設定為true, 所以訊息又被重新加入queue內



下一篇消息的持久化
2018/01/14   VictorHsiao

留言

這個網誌中的熱門文章

[RabbitMQ] 基本教學