rabbitmq

rabbitmq

MQ全稱為Message Queue, 訊息佇列(MQ)是一種應用程式對應用程式的通信方法。應用程式通過讀寫出入佇列的訊息(針對應用程式的數據)來通信,而無需專用連線來連結它們。訊息傳遞指的是程式之間通過在訊息中傳送數據進行通信,而不是通過直接調用彼此來通信,直接調用通常是用於諸如遠程過程調用的技術。排隊指的是應用程式通過 佇列來通信。佇列的使用除去了接收和傳送應用程式同時執行的要求。其中較為成熟的MQ產品有IBM WEBSPHERE MQ等等。

基本信息

MQ特點

MQ是消費-生產者模型的一個典型的代表,一端往訊息佇列中不斷寫入訊息,而另一端則可以讀取或者訂閱佇列中的訊息。MQ和JMS類似,但不同的是JMS是SUN JAVA訊息中間件服務的一個標準和API定義,而MQ則是遵循了AMQP協定的具體實現和產品。

使用場景

在項目中,將一些無需即時返回且耗時的操作提取出來,進行了異步處理,而這種異步處理的方式大大的節省了伺服器的請求回響時間,從而提高了系統的吞吐量。

含義

RabbitMQ是一個在AMQP基礎上完成的,可復用的企業訊息系統。他遵循 Mozilla Public License開源協定。

安裝

4.1) 安裝ERLANG

首先,因為RabbitMQ由ERLANG實現,下載ERLANG 原始碼。

解壓原始碼至ERLANG,然後進入$ERLANG目錄下

安裝依賴包:

Yum install tk

Yum install tcl

Yum install unixODBC

進入$ERLANG.編譯ERLANG

./configure –prefix=/usr/local/erlang

./make

./make install

並將erlang bin目錄加至PATH

4.2) 安裝RabbitMQ

下載RabbitMQ ,解壓至$RMQ。

啟動RabbitMQ

./bin/rabbitmq-server

客戶端

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Send {
private final static String QUEUE_NAME = "hello";

public static void main(String[] args) throws.IOException{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
channel.close();
connection.close();
}
}

消費者端

public class RabbitMQRecv {
private final static String QUEUE_NAME = "hello";

public static void main(String avg[]) throws.IOException,
java.lang.InterruptedException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(QUEUE_NAME, true, consumer);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [x] Received '" + message + "'");
}
}
}

幾個概念

Exchange:交換機,決定了訊息路由規則;

Queue:訊息佇列;

Channel:進行訊息讀寫的通道;

Bind:綁定了Queue和Exchange,意即為符合什麼樣路由規則的訊息,將會放置入哪一個訊息佇列;

訊息持久

1) 將交換機置為可持久;

2) 將通道置為可持久

3) 訊息傳送時設定可持久。

當我們“生產”了一條可持久化的訊息,嘗試中斷MQ服務,啟動消費者獲取訊息,訊息依然能夠恢復。相反,則拋出異常。

入門介紹

基本概念

RabbitMQ是流行的開源訊息佇列系統,用erlang語言開發。RabbitMQ是AMQP(高級訊息佇列協定)的標準實現。如果不熟悉AMQP,直接看RabbitMQ的文檔會比較困難。不過它也只有幾個關鍵概念,這裡簡單介紹。

RabbitMQ的結構圖如下:

rabbitmq rabbitmq

幾個概念說明:

Broker:簡單來說就是訊息佇列伺服器實體。
Exchange:訊息交換機,它指定訊息按什麼規則,路由到哪個佇列。
Queue:訊息佇列載體,每個訊息都會被投入到一個或多個佇列。
Binding:綁定,它的作用就是把exchange和queue按照路由規則綁定起來。
Routing Key:路由關鍵字,exchange根據這個關鍵字進行訊息投遞。
vhost:虛擬主機,一個broker里可以開設多個vhost,用作不同用戶的許可權分離。
producer:訊息生產者,就是投遞訊息的程式。
consumer:訊息消費者,就是接受訊息的程式。
channel:訊息通道,在客戶端的每個連線里,可建立多個channel,每個channel代表一個會話任務。

訊息佇列的使用過程大概如下:

(1)客戶端連線到訊息佇列伺服器,打開一個channel。
(2)客戶端聲明一個exchange,並設定相關屬性。
(3)客戶端聲明一個queue,並設定相關屬性。
(4)客戶端使用routing key,在exchange和queue之間建立好綁定關係。
(5)客戶端投遞訊息到exchange。

exchange接收到訊息後,就根據訊息的key和已經設定的binding,進行訊息路由,將訊息投遞到一個或多個佇列里。

exchange也有幾個類型,完全根據key進行投遞的叫做Direct交換機,例如,綁定時設定了routing key為”abc”,那么客戶端提交的訊息,只有設定了key為”abc”的才會投遞到佇列。對key進行模式匹配後進行投遞的叫做Topic交換機,符號”#”匹配一個或多個詞,符號”*”匹配正好一個詞。例如”abc.#”匹配”abc.def.ghi”,”abc.*”只匹配”abc.def”。還有一種不需要key的,叫做Fanout交換機,它採取廣播模式,一個訊息進來時,投遞到與該交換機綁定的所有佇列。

RabbitMQ支持訊息的持久化,也就是數據寫在磁碟上,為了數據安全考慮,我想大多數用戶都會選擇持久化。訊息佇列持久化包括3個部分:
(1)exchange持久化,在聲明時指定durable => 1
(2)queue持久化,在聲明時指定durable => 1
(3)訊息持久化,在投遞時指定delivery_mode => 2(1是非持久化)

如果exchange和queue都是持久化的,那么它們之間的binding也是持久化的。如果exchange和queue兩者之間有一個持久化,一個非持久化,就不允許建立綁定。

套用實際

使用Linux伺服器(ubuntu 9.10 64位),安裝RabbitMQ非常方便。

先運行如下命令安裝erlang:

apt-get install erlang-nox

下載RabbitMQ的安裝包,如下安裝:

dpkg -i rabbitmq-server_2.6.1-1_all.deb

安裝完後,使用

/etc/init.d/rabbitmq-server start|stop|restart

來啟動、停止、重啟rabbitmq。

在正式套用之前,先在RabbitMQ里創建一個vhost,加一個用戶,並設定該用戶的許可權。

使用rabbitmqctl客戶端工具,在根目錄下創建”/pyhtest”這個vhost:

rabbitmqctl add_vhost /pyhtest

創建一個用戶名”pyh”,設定密碼”pyh1234″:

rabbitmqctl add_user pyh pyh1234

設定pyh用戶對/pyhtest這個vhost擁有全部許可權:

rabbitmqctl set_permissions -p /pyhtest pyh “.*” “.*” “.*”

後面三個”*”代表pyh用戶擁有對/pyhtest的配置、寫、讀全部許可權

設定好後,開始編程,用Perl寫一個訊息投遞程式(producer):

#!/usr/bin/perl
use strict;
use Net::RabbitMQ;
use UUID::Tiny;

my $channel = 1000; # channel ID,可以隨意指定,只要不衝突
my $queuename = “pyh_queue”; # 佇列名
my $exchange = “pyh_exchange”; # 交換機名
my $routing_key = “test”; # routing key

my $mq = Net::RabbitMQ->new(); # 創建一個RabbitMQ對象

$mq->connect(“localhost”, { vhost => “/pyhtest”, user => “pyh”, password => “pyh1234″ }); # 建立連線
$mq->channel_open($channel); # 打開一個channel
$mq->exchange_declare($channel, $exchange, {durable => 1}); # 聲明一個持久化的交換機
$mq->queue_declare($channel, $queuename, {durable => 1}); # 聲明一個持久化的佇列
$mq->queue_bind($channel, $queuename, $exchange, $routing_key); # 使用routing key在交換機和佇列間建立綁定

for (my $i=0;$i<10000000;$i++) { # 循環1000萬次
my $string = create_UUID_as_string(UUID_V1); # 產生一條UUID作為訊息主體
$mq->publish($channel, $routing_key, $string, { exchange => $exchange }, { delivery_mode => 2 }); # 將訊息結合key以持久化模式投遞到交換機
}

$mq->disconnect(); # 下線

訊息接受程式(consumer)大概如下:

#!/usr/bin/perl
use strict;
use Net::RabbitMQ;

my $channel = 1001;
my $queuename = “pyh_queue”;
my $mq = Net::RabbitMQ->new();

$mq->connect(“localhost”, { vhost=>”/pyhtest”, user => “pyh”, password => “pyh1234″ });
$mq->channel_open($channel);

while (1) {
my $hashref = $mq->get($channel, $queuename);
last unless defined $hashref;
print $hashref->{message_count}, “: “, $hashref->{body},”\n”;
}

$mq->disconnect();

consumer連線後只要指定佇列就可獲取到訊息。

上述程式共投遞1000萬條訊息,每條訊息36位元組(UUID),打開持久化,共耗時17分多鐘(包括產生UUID的時間),每秒投遞訊息約9500條。測試機器是8G記憶體、8赫茲CPU。

投遞完後,在/var/lib/rabbitmq/mnesia/rabbit@${hostname}/msg_store_persistent目錄,產生2G多的持久化訊息數據。在運行consumer程式後,這些數據都會消失,因為訊息已經被消費了。

集群配置

單機環境下的集群配置:

首先啟動兩個實例,rab和rab2,連線埠分別為9991和9992

RABBITMQ_NODE_PORT=9991 RABBITMQ_NODENAME=rab rabbitmq-server -detached

RABBITMQ_NODE_PORT=9992 RABBITMQ_NODENAME=rab2 rabbitmq-server -detached

二:停止rab2,並將其加入cluster集群中

rabbitmqctl -n rab2 stop_app

rabbitmqctl -n rab2 join_cluster rab@rab(@rab,這裡的rab表示主機名)

重新啟動rab2

RABBITMQ_NODE_PORT=9992 RABBITMQ_NODENAME=rab2 rabbitmq-server -detached

查看集群的狀態

rabbitmqctl cluster_status -n rab

顯示如下信息表示集群配置正常:

Cluster status of node rab@rab ...
[{nodes,[{disc,[rab2@rab,rab@rab]}]},
{running_nodes,[rab@rab]},
{partitions,[]}]
...done.

相關詞條

熱門詞條

聯絡我們