Rabbitmq
一: 訊息佇列介紹
1.介紹
訊息佇列就是基礎資料結構中的 "先進先出" 的一種資料機構。想一下,生活中買東西需要排隊,先排隊的人先買消費,就是典型的 "先進先出"。
# 擴充套件
redis: 可以作為簡單的訊息佇列
celery: 本事就是基於訊息佇列進行的封裝。
2.MQ解決了什麼問題
MQ是一直存在,不過隨著微服務架構的流行,成了解決微服務和微服務之間通訊的常用工具。
# 擴充套件
1.兩個服務之間呼叫的方式:
1.restful七層協定oss(http協定)
2.rpc tcp socket層(遠端過程呼叫)
2.不管是使用restful還是rpc,都涉及到使用同步還是非同步:
1.非同步: client使用rpc和server互動,client使用非同步,不管有沒有執行成功,就不停的非同步的提交資料,資料在server訊息佇列排著隊,等待著消費。
1.應用的解耦
1.以電商應用為例,應用中有訂單系統,庫存系統,物流系統,支付系統。使用者建立訂單後,如果耦合呼叫庫存系統,物流系統,支付系統,任何一個子系統出現了故障,都會造成下單操作異常。
2.當轉變成基於佇列的方式後,系統間呼叫的問題會減少很多,比如物流系統因為發生故障,需要幾分鐘來修復。在這幾分鐘的時間裡,物流系統要處理的記憶體被快取在訊息佇列中,使用者的下單操作可以正常完成。當物流系統恢復後,繼續處理訂單資訊即可,訂單使用者感受不到物流系統的故障。提升系統的可用性。
2.流量削峰
1.舉個列子,如果訂單系統最多能處理一萬次訂單,這個處理能力應付正常時段的下單時綽綽有餘,正常時段我們下單一秒後就能返回結果。但是在高峰期,如果有兩萬次下單作業系統是處理不了的,只能限制訂單超過一萬後不允許使用者下單。
2.使用訊息佇列做快取,我們可以取消這個限制,把一秒內下的訂單分散成一段時間來處理,這時有些使用者可能在下單十幾秒後才能收到下單成功的操作,但是比不能下單的體驗要好。
# 結:
1.通常下比如有兩萬訂單,這時我們server肯定消費不過來,我們將兩萬丟到訊息佇列中,進行消費即可。 --- 就叫流量消峰 = 如: 雙十一,訊息佇列 多消費
3.訊息分發(釋出訂閱: 觀察者模式)
多個服務對資料感興趣,只需要監聽同一類訊息即可處理。
列如A產生資料,B對資料感興趣。如果沒有訊息佇列A每次處理完需要呼叫一下B服務。過了一段時間C對資料也感興趣,A就需要改程式碼,呼叫B服務,呼叫C服務。只要有服務需要,A服務都要改動程式碼。很不方便。
xxxxxxxxxx 有了訊息佇列後,A只管傳送一次訊息,B對訊息感興趣,只需要監聽訊息。C感興趣,C也去監聽訊息。A服務作為基礎服務完全不需要有改動。
4.非同步訊息(celery就是對訊息佇列的封裝)
xxxxxxxxxx 有些服務間呼叫是非同步的: 1.例如A呼叫B,B需要花費很長時間執行,但是A需要知道B什麼時候可以執行完,以前一般有兩種方式,A過了一段時間去呼叫B的查詢api是否完成。 2.或者A提供一個callback api,B執行完之後呼叫api通知A服務。這兩種方式都不是很優雅。python
1.使用訊息匯流排,可以很方便解決這個問題,A呼叫B服務後,只需要監聽B處理完成的訊息,當B處理完成後,會傳送一條訊息給MQ,MQ會將此訊息轉發給A服務。
2.這樣A服務既不用迴圈呼叫B的查詢api,也不用提供callback api。同樣B服務也不用做這些操作。A服務還能及時的得到非同步處理成功的訊息。
3.常見訊息佇列及比較
xxxxxxxxxx RabbitMQ: 支援持久化,斷電後,重啟,資料是不會丟的。 1. 吞吐量小: 幾百萬都是沒問題的,訊息確認: 我告訴你,我消費完了,你在刪 2.應用場景: 訂單,對訊息可靠性有要求,就用它 Kafka: 吞吐量高,注重高吞吐量,不注重訊息的可靠性 1.你拿走就沒了,消費過程崩了,就沒了。 2.應用場景,資料量特別大。 # 結論: 1.Kafka在於分散式架構,RabbitMQ基於AMQP協定來實現,RocketMQ/思路來源於Kafka,改成了主從結構,在事務性可靠性方面做了優化。 2.廣泛來說,電商,金融等對事物性要求很高的,可以考慮RabbitMQ,對效能要求或吞吐量高的可考慮Kafka。python
二:Rabbitmq安裝
安裝兩種
1.伺服器端原生安裝
1 原生安裝
-安裝擴充套件epel源
-yum -y install erlang
-yum -y install rabbitmq-server
# 查詢是否安裝
rpm -qa rabbitmq-server
-systemctl start rabbitmq-server
# 以上也有web管理頁面,只不過需要組態檔設定。
# 第一種方式使用者端連線伺服器端,可以不用設定使用者和密碼,只需要ip連線。第二種方式則需要設定使用者名稱和密碼。
2.伺服器端docker拉取
2 docker拉取
-docker pull rabbitmq:management(自動開啟了web管理介面)
-docker run -di --name rabbitmq -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 rabbitmq:management # 直接 run 如果沒有rabbitmq就會自動拉
"""
docker run -di --name: 指定rabbitmq
-e: 環境變數
-e RABBITMQ_DEFAULT_USER=admin:使用者名稱
-e RABBITMQ_DEFAULT_PASS=admin:密碼
-p 15672:15672: rabbitmq web管理介面埠
-p 5672:5672: rabbitmq預設的監聽埠
"""
docker ps
http://47.101.159.222:15672/
3.Rabbitmq視覺化介面建立使用者(設定使用者和密碼)
4.命令建立Rabbitmq使用者(設定使用者和密碼)
4 建立使用者
rabbitmqctl add_user lqz 123
5 分配許可權
# 設定使用者為admin角色
rabbitmqctl set_user_tags lqz administrator
# 設定許可權
rabbitmqctl set_permissions -p "/" lqz ".*" ".*" ".*"
# rabbitmqctl set_permissions -p "/" 使用者名稱 ".*" ".*" ".*"
三:使用者端安裝
pip3 install pika
四: 基本使用(生產者消費者模型)
對於RabbitMQ來說,生產和消費不再針對記憶體裡的一個Queue物件,而是某臺伺服器上的RabbitMQ Server實現的訊息佇列。
* 生產者
import pika
# 建立連線物件
# connection = pika.BlockingConnection(pika.ConnectionParameters(host='47.101.159.222:22')) # host指定rabbitmq伺服器ip地址
# 指定使用者名稱和密碼,如果rabbitmq沒有設定使用者名稱和密碼,可以不指定
credentials = pika.PlainCredentials("admin", "admin")
connection = pika.BlockingConnection(pika.ConnectionParameters('47.101.159.222', credentials=credentials)) # host指定rabbitmq伺服器ip地址,credentials指定使用者名稱和密碼
# 建立channel物件,用於傳送訊息,接收訊息,宣告佇列
channel = connection.channel() # connection.channel(): 在連線上建立一個頻道,這個頻道就是我們要通過它來傳送,接收訊息的物件,類似於TCP中的socket物件,我們通過它來收發訊息,宣告佇列
# 宣告佇列,如果佇列不存在,則建立佇列,如果佇列存在,則不建立
channel.queue_declare(queue='datalog')
# 生產者向佇列中放一條訊息
channel.basic_publish(exchange='', # 交換機,如果不指定,則使用預設的交換機, 預設的交換機
routing_key='datalog', # 佇列名稱
body='zll nb!' # 傳送的訊息
)
print("Sent 'Hello World!'")
# 關閉連線
connection.close()
import pika, sys, os
def main():
# 建立連線物件
# connection = pika.BlockingConnection(pika.ConnectionParameters(host='47.101.159.222:22')) # host指定rabbitmq伺服器ip地址
# 指定使用者名稱和密碼,如果rabbitmq沒有設定使用者名稱和密碼,可以不指定
credentials = pika.PlainCredentials("admin", "admin")
connection = pika.BlockingConnection(pika.ConnectionParameters('47.101.159.222',
credentials=credentials)) # host指定rabbitmq伺服器ip地址,credentials指定使用者名稱和密碼
# 建立channel物件,用於傳送訊息,接收訊息,宣告佇列
channel = connection.channel() # connection.channel(): 在連線上建立一個頻道,這個頻道就是我們要通過它來傳送,接收訊息的物件,類似於TCP中的socket物件,我們通過它來收發訊息,宣告佇列
"""消費者也宣告了佇列,因為如果是消費者先啟動,那麼佇列就不存在了,消費者就無法消費訊息了,所以消費者也要宣告佇列"""
channel.queue_declare(queue='datalog')
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
# 消費者從佇列queue指定的消費佇列hello中取訊息,拿到資料了之前將hello佇列的資料丟到callback裡面,如果佇列中沒有訊息,則會一直等待,直到有訊息為止
# auto_ack=True:自動確認訊息,如果不設定為True,那麼訊息會一直處於未確認狀態,即使消費者已經消費了訊息,訊息也不會從佇列中刪除,這樣就會造成訊息的重複消費,所以一般都會設定為True
# auto_ack=true: 佇列接收到既直接確認,就會刪除佇列中的訊息,不會管後面資料會不會消費完。
channel.basic_consume(queue='datalog', on_message_callback=callback, auto_ack=True) # 預設為false,不自動確認訊息,需要手動確認
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming() # 開始消費訊息,如果佇列中沒有訊息,那麼就會一直等待,直到有訊息為止,如果佇列中有訊息,那麼就會消費訊息
if __name__ == '__main__':
main()
五: 訊息確認機制 (訊息安全之ack)
# auto_ack: 自動確認訊息(佇列接收到就會確認消費,會丟失資料的可能性) 預設為false
auto_ack=true: 佇列接收到既直接確認,就會刪除佇列中的訊息,不會管後面資料會不會消費完。
auto_ack=false: 設定為false的情況,那麼訊息會一直處於未確認狀態,即使消費者已經消費了訊息,訊息也不會從佇列中刪除,這樣就會造成訊息的重複消費
# ch.basic_ack: 消費完後,自動確認消費(可靠性,保證資料都完整的消費): 常用推薦
ch.basic_ack(delivery_tag=method.delivery_tag): 真正的將訊息消費完了後,再發確認,就會刪除掉佇列中的訊息。
* 生產者
import pika
# 建立連線物件
# connection = pika.BlockingConnection(pika.ConnectionParameters(host='47.101.159.222:22')) # host指定rabbitmq伺服器ip地址
# 指定使用者名稱和密碼,如果rabbitmq沒有設定使用者名稱和密碼,可以不指定
credentials = pika.PlainCredentials("admin", "admin")
connection = pika.BlockingConnection(pika.ConnectionParameters('47.101.159.222', credentials=credentials)) # host指定rabbitmq伺服器ip地址,credentials指定使用者名稱和密碼
# 建立channel物件,用於傳送訊息,接收訊息,宣告佇列
channel = connection.channel() # connection.channel(): 在連線上建立一個頻道,這個頻道就是我們要通過它來傳送,接收訊息的物件,類似於TCP中的socket物件,我們通過它來收發訊息,宣告佇列
# 宣告佇列,如果佇列不存在,則建立佇列,如果佇列存在,則不建立
channel.queue_declare(queue='datalog')
# 生產者向佇列中放一條訊息
channel.basic_publish(exchange='', # 交換機,如果不指定,則使用預設的交換機, 預設的交換機
routing_key='datalog', # 佇列名稱
body='zll nb!' # 傳送的訊息
)
print("Sent 'Hello World!'")
# 關閉連線
connection.close()
import pika, sys, os
def main():
# 建立連線物件
# connection = pika.BlockingConnection(pika.ConnectionParameters(host='47.101.159.222:22')) # host指定rabbitmq伺服器ip地址
# 指定使用者名稱和密碼,如果rabbitmq沒有設定使用者名稱和密碼,可以不指定
credentials = pika.PlainCredentials("admin", "admin")
connection = pika.BlockingConnection(pika.ConnectionParameters('47.101.159.222',
credentials=credentials)) # host指定rabbitmq伺服器ip地址,credentials指定使用者名稱和密碼
# 建立channel物件,用於傳送訊息,接收訊息,宣告佇列
channel = connection.channel() # connection.channel(): 在連線上建立一個頻道,這個頻道就是我們要通過它來傳送,接收訊息的物件,類似於TCP中的socket物件,我們通過它來收發訊息,宣告佇列
"""消費者也宣告了佇列,因為如果是消費者先啟動,那麼佇列就不存在了,消費者就無法消費訊息了,所以消費者也要宣告佇列"""
channel.queue_declare(queue='datalog')
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
# 真正的將訊息消費完了,再發確認
ch.basic_ack(delivery_tag=method.delivery_tag)
# 消費者從佇列queue指定的消費佇列hello中取訊息,拿到資料了之前將hello佇列的資料丟到callback裡面,如果佇列中沒有訊息,則會一直等待,直到有訊息為止
# auto_ack=True:自動確認訊息,如果不設定為True,那麼訊息會一直處於未確認狀態,即使消費者已經消費了訊息,訊息也不會從佇列中刪除,這樣就會造成訊息的重複消費,所以一般都會設定為True
# auto_ack=true: 佇列接收到既直接確認,就會刪除佇列中的訊息,不會管後面資料會不會消費完。
channel.basic_consume(queue='datalog', on_message_callback=callback, auto_ack=False) # 預設為false,不自動確認訊息,需要手動確認
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming() # 開始消費訊息,如果佇列中沒有訊息,那麼就會一直等待,直到有訊息為止,如果佇列中有訊息,那麼就會消費訊息
if __name__ == '__main__':
main()
六: 持久化(訊息安全之durable持久化)
1.什麼是rabbitmq持久化?
資料支援持久化,執行過程中,rabbitmq宕機了,在重新啟動起來,如果佇列消費訊息沒被消費,那麼就還是會存在。
2.設定佇列持久化
# 在建立佇列的時候增加durable=True設定佇列持久化,如果rabbitmq服務重啟,佇列不會丟失
channel.queue_declare(queue='datalog',durable=True)
3.設定訊息持久化
# 在釋出訊息的時候增加properties設定訊息持久化,如果rabbitmq服務停止,重啟後,訊息還在, 1:非持久化,2:持久化,預設為1
properties=pika.BasicProperties(delivery_mode=2,)
# 注意:
1.沒加持久化設定之前的佇列不會支援持久化,需要加持久化設定之後重新建立。
* 生產者
import pika
# 建立連線物件
# connection = pika.BlockingConnection(pika.ConnectionParameters(host='47.101.159.222:22')) # host指定rabbitmq伺服器ip地址
# 指定使用者名稱和密碼,如果rabbitmq沒有設定使用者名稱和密碼,可以不指定
credentials = pika.PlainCredentials("admin", "admin")
connection = pika.BlockingConnection(pika.ConnectionParameters('47.101.159.222', credentials=credentials)) # host指定rabbitmq伺服器ip地址,credentials指定使用者名稱和密碼
# 建立channel物件,用於傳送訊息,接收訊息,宣告佇列
channel = connection.channel() # connection.channel(): 在連線上建立一個頻道,這個頻道就是我們要通過它來傳送,接收訊息的物件,類似於TCP中的socket物件,我們通過它來收發訊息,宣告佇列
# 宣告佇列,如果佇列不存在,則建立佇列,如果佇列存在,則不建立
channel.queue_declare(queue='datalog', durable=True) # durable=True: 佇列持久化,如果rabbitmq服務停止,重啟後,佇列還在
# 生產者向佇列中放一條訊息
channel.basic_publish(exchange='', # 交換機,如果不指定,則使用預設的交換機, 預設的交換機
routing_key='datalog', # 佇列名稱
body='zll nb!', # 傳送的訊息
properties=pika.BasicProperties(
delivery_mode=2, # 訊息持久化,如果rabbitmq服務停止,重啟後,訊息還在, 1:非持久化,2:持久化
)
)
print("Sent 'Hello World!'")
# 關閉連線
connection.close()
import pika, sys, os
def main():
# 建立連線物件
# connection = pika.BlockingConnection(pika.ConnectionParameters(host='47.101.159.222:22')) # host指定rabbitmq伺服器ip地址
# 指定使用者名稱和密碼,如果rabbitmq沒有設定使用者名稱和密碼,可以不指定
credentials = pika.PlainCredentials("admin", "admin")
connection = pika.BlockingConnection(pika.ConnectionParameters('47.101.159.222',
credentials=credentials)) # host指定rabbitmq伺服器ip地址,credentials指定使用者名稱和密碼
# 建立channel物件,用於傳送訊息,接收訊息,宣告佇列
channel = connection.channel() # connection.channel(): 在連線上建立一個頻道,這個頻道就是我們要通過它來傳送,接收訊息的物件,類似於TCP中的socket物件,我們通過它來收發訊息,宣告佇列
"""消費者也宣告了佇列,因為如果是消費者先啟動,那麼佇列就不存在了,消費者就無法消費訊息了,所以消費者也要宣告佇列"""
channel.queue_declare(queue='datalog', durable=True) # durable=True: 佇列持久化,如果rabbitmq服務重啟,佇列不會丟失
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
# 真正的將訊息消費完了,再發確認
ch.basic_ack(delivery_tag=method.delivery_tag)
# 消費者從佇列queue指定的消費佇列hello中取訊息,拿到資料了之前將hello佇列的資料丟到callback裡面,如果佇列中沒有訊息,則會一直等待,直到有訊息為止
# auto_ack=True:自動確認訊息,如果不設定為True,那麼訊息會一直處於未確認狀態,即使消費者已經消費了訊息,訊息也不會從佇列中刪除,這樣就會造成訊息的重複消費,所以一般都會設定為True
# auto_ack=true: 佇列接收到既直接確認,就會刪除佇列中的訊息,不會管後面資料會不會消費完。
channel.basic_consume(queue='datalog', on_message_callback=callback, auto_ack=False) # 預設為false,不自動確認訊息,需要手動確認
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming() # 開始消費訊息,如果佇列中沒有訊息,那麼就會一直等待,直到有訊息為止,如果佇列中有訊息,那麼就會消費訊息
if __name__ == '__main__':
main()
七: 閒置消費
1.什麼是閒置消費?
1.正常情況如果有多個消費者,是按照順序第一個訊息給第一個消費者,第二個訊息給第二個消費者,以此類推,只能按照順序。
2.但是可能第一個消費的消費者處理訊息很耗時,一直沒結束,此時就可以讓第二個消費者優先獲取閒置的訊息,次方法就稱之為"閒置消費"。
2.設定閒置消費
# 消費者設定,每次只接收一條訊息,處理完了再接收下一條,這樣可以保證訊息的順序性,不會出現訊息亂序的情況
channel.basic_qos(prefetch_count=1) # 1代表每次只接收一條訊息,接收完了再接收下一條
# 缺點:
1.但是會降低效率,因為每次只處理一條訊息,如果訊息處理很快,那麼效率就會降低
* 生產者
import pika
# 建立連線物件
# connection = pika.BlockingConnection(pika.ConnectionParameters(host='47.101.159.222:22')) # host指定rabbitmq伺服器ip地址
# 指定使用者名稱和密碼,如果rabbitmq沒有設定使用者名稱和密碼,可以不指定
credentials = pika.PlainCredentials("admin", "admin")
connection = pika.BlockingConnection(pika.ConnectionParameters('47.101.159.222', credentials=credentials)) # host指定rabbitmq伺服器ip地址,credentials指定使用者名稱和密碼
# 建立channel物件,用於傳送訊息,接收訊息,宣告佇列
channel = connection.channel() # connection.channel(): 在連線上建立一個頻道,這個頻道就是我們要通過它來傳送,接收訊息的物件,類似於TCP中的socket物件,我們通過它來收發訊息,宣告佇列
# 宣告佇列,如果佇列不存在,則建立佇列,如果佇列存在,則不建立
channel.queue_declare(queue='datalog', durable=True) # durable=True: 佇列持久化,如果rabbitmq服務停止,重啟後,佇列還在
# 生產者向佇列中放一條訊息
channel.basic_publish(exchange='', # 交換機,如果不指定,則使用預設的交換機, 預設的交換機
routing_key='datalog', # 佇列名稱
body='zll nb!', # 傳送的訊息
properties=pika.BasicProperties(
delivery_mode=2, # 訊息持久化,如果rabbitmq服務停止,重啟後,訊息還在, 1:非持久化,2:持久化
)
)
print("Sent 'Hello World!'")
# 關閉連線
connection.close()
import time
import pika, sys, os
def main():
# 建立連線物件
# connection = pika.BlockingConnection(pika.ConnectionParameters(host='47.101.159.222:22')) # host指定rabbitmq伺服器ip地址
# 指定使用者名稱和密碼,如果rabbitmq沒有設定使用者名稱和密碼,可以不指定
credentials = pika.PlainCredentials("admin", "admin")
connection = pika.BlockingConnection(pika.ConnectionParameters('47.101.159.222',
credentials=credentials)) # host指定rabbitmq伺服器ip地址,credentials指定使用者名稱和密碼
# 建立channel物件,用於傳送訊息,接收訊息,宣告佇列
channel = connection.channel() # connection.channel(): 在連線上建立一個頻道,這個頻道就是我們要通過它來傳送,接收訊息的物件,類似於TCP中的socket物件,我們通過它來收發訊息,宣告佇列
"""消費者也宣告了佇列,因為如果是消費者先啟動,那麼佇列就不存在了,消費者就無法消費訊息了,所以消費者也要宣告佇列"""
channel.queue_declare(queue='datalog', durable=True) # durable=True: 佇列持久化,如果rabbitmq服務重啟,佇列不會丟失
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
time.sleep(50) # 模擬處理任務,耗時50秒
# 真正的將訊息消費完了,再發確認
ch.basic_ack(delivery_tag=method.delivery_tag)
# 閒置消費
channel.basic_qos(prefetch_count=1) # 每次只接收一條訊息,處理完了再接收下一條,這樣可以保證訊息的順序性,不會出現訊息亂序的情況,但是會降低效率,因為每次只處理一條訊息,如果訊息處理很快,那麼效率就會降低
# 消費者從佇列queue指定的消費佇列hello中取訊息,拿到資料了之前將hello佇列的資料丟到callback裡面,如果佇列中沒有訊息,則會一直等待,直到有訊息為止
# auto_ack=True:自動確認訊息,如果不設定為True,那麼訊息會一直處於未確認狀態,即使消費者已經消費了訊息,訊息也不會從佇列中刪除,這樣就會造成訊息的重複消費,所以一般都會設定為True
# auto_ack=true: 佇列接收到既直接確認,就會刪除佇列中的訊息,不會管後面資料會不會消費完。
channel.basic_consume(queue='datalog', on_message_callback=callback, auto_ack=False) # 預設為false,不自動確認訊息,需要手動確認
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming() # 開始消費訊息,如果佇列中沒有訊息,那麼就會一直等待,直到有訊息為止,如果佇列中有訊息,那麼就會消費訊息
if __name__ == '__main__':
main()
import pika, sys, os
def main():
# 建立連線物件
# connection = pika.BlockingConnection(pika.ConnectionParameters(host='47.101.159.222:22')) # host指定rabbitmq伺服器ip地址
# 指定使用者名稱和密碼,如果rabbitmq沒有設定使用者名稱和密碼,可以不指定
credentials = pika.PlainCredentials("admin", "admin")
connection = pika.BlockingConnection(pika.ConnectionParameters('47.101.159.222',
credentials=credentials)) # host指定rabbitmq伺服器ip地址,credentials指定使用者名稱和密碼
# 建立channel物件,用於傳送訊息,接收訊息,宣告佇列
channel = connection.channel() # connection.channel(): 在連線上建立一個頻道,這個頻道就是我們要通過它來傳送,接收訊息的物件,類似於TCP中的socket物件,我們通過它來收發訊息,宣告佇列
"""消費者也宣告了佇列,因為如果是消費者先啟動,那麼佇列就不存在了,消費者就無法消費訊息了,所以消費者也要宣告佇列"""
channel.queue_declare(queue='datalog', durable=True) # durable=True: 佇列持久化,如果rabbitmq服務重啟,佇列不會丟失
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
# 真正的將訊息消費完了,再發確認
ch.basic_ack(delivery_tag=method.delivery_tag)
# 閒置消費
channel.basic_qos(prefetch_count=1) # 每次只接收一條訊息,處理完了再接收下一條,這樣可以保證訊息的順序性,不會出現訊息亂序的情況,但是會降低效率,因為每次只處理一條訊息,如果訊息處理很快,那麼效率就會降低
# 消費者從佇列queue指定的消費佇列hello中取訊息,拿到資料了之前將hello佇列的資料丟到callback裡面,如果佇列中沒有訊息,則會一直等待,直到有訊息為止
# auto_ack=True:自動確認訊息,如果不設定為True,那麼訊息會一直處於未確認狀態,即使消費者已經消費了訊息,訊息也不會從佇列中刪除,這樣就會造成訊息的重複消費,所以一般都會設定為True
# auto_ack=true: 佇列接收到既直接確認,就會刪除佇列中的訊息,不會管後面資料會不會消費完。
channel.basic_consume(queue='datalog', on_message_callback=callback, auto_ack=False) # 預設為false,不自動確認訊息,需要手動確認
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming() # 開始消費訊息,如果佇列中沒有訊息,那麼就會一直等待,直到有訊息為止,如果佇列中有訊息,那麼就會消費訊息
if __name__ == '__main__':
main()
八: 釋出訂閱(fanout)
釋出訂閱: 可以有多個訂閱者來訂閱釋出者的訊息
# fanout:不需要routing_key,只需要將訊息傳送到交換機中,交換機會將訊息傳送到所有繫結到它的佇列中
# 實現釋出訂閱邏輯
1.釋出者 P 將訊息傳送到 X 交換機上面,
2.C1,C2,多個訂閱者隨機建立出多個佇列,將訂閱者佇列繫結給 X 交換機,
3.X 交換機通過佇列將資料傳送給所有繫結 X 交換機的訂閱者。
import pika
# 指定使用者名稱和密碼,如果rabbitmq沒有設定使用者名稱和密碼,可以不指定
credentials = pika.PlainCredentials("admin", "admin")
# host指定rabbitmq伺服器ip地址,credentials指定使用者名稱和密碼
connection = pika.BlockingConnection(pika.ConnectionParameters('47.101.159.222',
credentials=credentials))
# 在連線上建立一個頻道,這個頻道就是我們要通過它來傳送,接收訊息的物件,類似於TCP中的socket物件,我們通過它來收發訊息,宣告佇列
channel = connection.channel()
# 不需要宣告佇列,因為生產者不需要將訊息放到佇列中,只需要將訊息傳送到交換機中即可
channel.exchange_declare(exchange='logs', exchange_type='fanout') # 宣告交換機,交換機型別為fanout
# exchange_type的三種型別:
# 1、direct:根據routing_key將訊息放到對應的佇列中
# 2、topic:根據routing_key和binding_key將訊息放到對應的佇列中
# 3、fanout:不需要routing_key,只需要將訊息傳送到交換機中,交換機會將訊息傳送到所有繫結到它的佇列中
message = "info: Hello World!"
# 傳送訊息到交換機中
channel.basic_publish(exchange='logs', routing_key='', body=message) # 傳送訊息到交換機
connection.close()
import pika
# 指定使用者名稱和密碼,如果rabbitmq沒有設定使用者名稱和密碼,可以不指定
credentials = pika.PlainCredentials("admin", "admin")
# host指定rabbitmq伺服器ip地址,credentials指定使用者名稱和密碼
connection = pika.BlockingConnection(pika.ConnectionParameters('47.101.159.222',
credentials=credentials))
# 在連線上建立一個頻道,這個頻道就是我們要通過它來傳送,接收訊息的物件,類似於TCP中的socket物件,我們通過它來收發訊息,宣告佇列
channel = connection.channel()
# 不需要宣告佇列,因為生產者不需要將訊息放到佇列中,只需要將訊息傳送到交換機中即可
channel.exchange_declare(exchange='logs', exchange_type='fanout') # 宣告交換機,交換機型別為fanout
# 宣告一個隨機佇列,exclusive=True表示這個佇列只能被當前連線使用,當連線關閉時,佇列會被刪除,exclusive=True是為了防止多個消費者同時消費一個佇列,導致訊息重複消費
result = channel.queue_declare(queue='', exclusive=True)
# 獲取隨機佇列的名稱,隨機的意義是什麼: 每次執行程式都會建立一個新的佇列,這樣就不會有多個消費者同時消費同一個佇列中的訊息,這樣就可以實現訊息的負載均衡,每個消費者都會平均的消費佇列中的訊息
queue_name = result.method.queue
# 預設會建立一個隨機佇列,佇列名稱是隨機的。這個佇列只能被當前連線使用,當連線關閉時,佇列會被刪除。
# 也可以指定佇列名稱,但是要確保佇列名稱是唯一的,不然會報錯
print(queue_name)
channel.queue_bind(exchange='logs', queue=queue_name) # 將佇列繫結到交換機上,交換機型別為fanout,所以不需要指定routing_key,交換機會將訊息傳送到所有繫結到它上面的佇列
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] %r" % body)
channel.basic_consume(
queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()
九:關鍵字(direct)
direct:根據routing_key將訊息放到對應的佇列中
1.關鍵字
1.將隨機佇列繫結到交換機上,routing_key指定路由鍵,這裡指定的是key,
2.表示只有路由鍵為info的訊息才會被傳送到該隨機佇列中,也就是說只有生產者傳送的訊息的路由鍵為key的訊息才會被消費。
# 總結:
將隨機佇列繫結到交換機上,routing_key為指定消費交換機的佇列名稱,從而實現指定消費,然後將訊息從繫結的交換機的佇列獲取, 消費。
routing_key監聽的佇列名稱
import pika
# 指定使用者名稱和密碼,如果rabbitmq沒有設定使用者名稱和密碼,可以不指定
credentials = pika.PlainCredentials("admin", "admin")
# host指定rabbitmq伺服器ip地址,credentials指定使用者名稱和密碼
connection = pika.BlockingConnection(pika.ConnectionParameters('47.101.159.222',
credentials=credentials))
# 在連線上建立一個頻道,這個頻道就是我們要通過它來傳送,接收訊息的物件,類似於TCP中的socket物件,我們通過它來收發訊息,宣告佇列
channel = connection.channel()
# 不需要宣告佇列,因為生產者不需要將訊息放到佇列中,只需要將訊息傳送到交換機中即可
channel.exchange_declare(exchange='zll', exchange_type='direct') # 宣告交換機,交換機型別為direct
# exchange_type的三種型別:
# 1、direct:根據routing_key將訊息放到對應的佇列中
# 2、topic:根據routing_key和binding_key將訊息放到對應的佇列中
# 3、fanout:不需要routing_key,只需要將訊息傳送到交換機中,交換機會將訊息傳送到所有繫結到它的佇列中
message = "info: Hello World!"
# 傳送訊息到交換機中
channel.basic_publish(exchange='zll', routing_key='bnb', body=message) # routing_key為bnb,訊息會被傳送到bnb佇列中,如果沒有bnb佇列,訊息會被丟棄,因為沒有佇列可以接收訊息
connection.close()
import pika
# 指定使用者名稱和密碼,如果rabbitmq沒有設定使用者名稱和密碼,可以不指定
credentials = pika.PlainCredentials("admin", "admin")
# host指定rabbitmq伺服器ip地址,credentials指定使用者名稱和密碼
connection = pika.BlockingConnection(pika.ConnectionParameters('47.101.159.222',
credentials=credentials))
# 在連線上建立一個頻道,這個頻道就是我們要通過它來傳送,接收訊息的物件,類似於TCP中的socket物件,我們通過它來收發訊息,宣告佇列
channel = connection.channel()
# 不需要宣告佇列,因為生產者不需要將訊息放到佇列中,只需要將訊息傳送到交換機中即可
channel.exchange_declare(exchange='zll', exchange_type='direct') # 宣告交換機,交換機型別為fanout
# 宣告一個隨機佇列,exclusive=True表示這個佇列只能被當前連線使用,當連線關閉時,佇列會被刪除,exclusive=True是為了防止多個消費者同時消費一個佇列,導致訊息重複消費
result = channel.queue_declare(queue='', exclusive=True)
# 獲取隨機佇列的名稱,隨機的意義是什麼: 每次執行程式都會建立一個新的佇列,這樣就不會有多個消費者同時消費同一個佇列中的訊息,這樣就可以實現訊息的負載均衡,每個消費者都會平均的消費佇列中的訊息
queue_name = result.method.queue
# 預設會建立一個隨機佇列,佇列名稱是隨機的。這個佇列只能被當前連線使用,當連線關閉時,佇列會被刪除。
# 也可以指定佇列名稱,但是要確保佇列名稱是唯一的,不然會報錯
print(queue_name)
# 將佇列繫結到交換機上,routing_key指定路由鍵,這裡指定的是info,表示只有路由鍵為info的訊息才會被傳送到該隨機佇列中,也就是說只有生產者傳送的訊息的路由鍵為info的訊息才會被消費。
channel.queue_bind(exchange='zll', queue=queue_name, routing_key='nb') # 將佇列繫結到交換機上,routing_key為指定消費交換機的佇列名稱,從而實現指定消費,然後將訊息從繫結到交換機的佇列獲取
channel.queue_bind(exchange='zll', queue=queue_name, routing_key='bnb')
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] %r" % body)
channel.basic_consume(
queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()
十:模糊匹配(topic)
topic:根據routing_key和binding_key將訊息放到對應的佇列中
# 模糊匹配的關鍵
# : 表示後面可以跟任意字元
* : 表示後面只能跟一個單詞
import pika
# 指定使用者名稱和密碼,如果rabbitmq沒有設定使用者名稱和密碼,可以不指定
credentials = pika.PlainCredentials("admin", "admin")
# host指定rabbitmq伺服器ip地址,credentials指定使用者名稱和密碼
connection = pika.BlockingConnection(pika.ConnectionParameters('47.101.159.222',
credentials=credentials))
# 在連線上建立一個頻道,這個頻道就是我們要通過它來傳送,接收訊息的物件,類似於TCP中的socket物件,我們通過它來收發訊息,宣告佇列
channel = connection.channel()
# 不需要宣告佇列,因為生產者不需要將訊息放到佇列中,只需要將訊息傳送到交換機中即可
channel.exchange_declare(exchange='aaa', exchange_type='topic') # 宣告交換機,交換機型別為direct
# exchange_type的三種型別:
# 1、direct:根據routing_key將訊息放到對應的佇列中
# 2、topic:根據routing_key和binding_key將訊息放到對應的佇列中
# 3、fanout:不需要routing_key,只需要將訊息傳送到交換機中,交換機會將訊息傳送到所有繫結到它的佇列中
message = "info: Hello World!"
# 傳送訊息到交換機中
channel.basic_publish(exchange='aaa', routing_key='bnb.xxxx', body=message) # routing_key為bnb,訊息會被傳送到bnb佇列中,如果沒有bnb佇列,訊息會被丟棄,因為沒有佇列可以接收訊息
connection.close()
import pika
# 指定使用者名稱和密碼,如果rabbitmq沒有設定使用者名稱和密碼,可以不指定
credentials = pika.PlainCredentials("admin", "admin")
# host指定rabbitmq伺服器ip地址,credentials指定使用者名稱和密碼
connection = pika.BlockingConnection(pika.ConnectionParameters('47.101.159.222',
credentials=credentials))
# 在連線上建立一個頻道,這個頻道就是我們要通過它來傳送,接收訊息的物件,類似於TCP中的socket物件,我們通過它來收發訊息,宣告佇列
channel = connection.channel()
# 不需要宣告佇列,因為生產者不需要將訊息放到佇列中,只需要將訊息傳送到交換機中即可
channel.exchange_declare(exchange='aaa', exchange_type='topic') # 宣告交換機,交換機型別為fanout
# 宣告一個隨機佇列,exclusive=True表示這個佇列只能被當前連線使用,當連線關閉時,佇列會被刪除,exclusive=True是為了防止多個消費者同時消費一個佇列,導致訊息重複消費
result = channel.queue_declare(queue='', exclusive=True)
# 獲取隨機佇列的名稱,隨機的意義是什麼: 每次執行程式都會建立一個新的佇列,這樣就不會有多個消費者同時消費同一個佇列中的訊息,這樣就可以實現訊息的負載均衡,每個消費者都會平均的消費佇列中的訊息
queue_name = result.method.queue
# 預設會建立一個隨機佇列,佇列名稱是隨機的。這個佇列只能被當前連線使用,當連線關閉時,佇列會被刪除。
# 也可以指定佇列名稱,但是要確保佇列名稱是唯一的,不然會報錯
print(queue_name)
# 將隨機佇列繫結到交換機上,routing_key為指定消費交換機的佇列名稱,從而實現指定消費,然後將訊息從繫結的交換機的佇列獲取, routing_key監聽的佇列名稱
channel.queue_bind(exchange='aaa', queue=queue_name, routing_key='nb') # 將佇列繫結到交換機上,routing_key為指定消費交換機的佇列名稱,從而實現指定消費,然後將訊息從繫結到交換機的佇列獲取
channel.queue_bind(exchange='aaa', queue=queue_name, routing_key='bnb.#')
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] %r" % body)
channel.basic_consume(
queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()
十一:通過rabbitmq實現rpc(基於RabbitMQ封裝RPC)
# 通過RabbitMQ實現rpc
# 實現邏輯
1.伺服器端啟動接收訊息,監聽queue佇列。
2.實列化使用者端,呼叫call方法,將訊息屬性內包含: 1.回撥函數隨機佇列,接收伺服器端返回結果,伺服器端會將結果傳送到這個佇列。2.客戶但的隨機uuid,標識唯一訊息。然後將body訊息傳送給伺服器端。
3.使用者端,釋出完訊息後,進入非阻塞狀態,如果沒有接收到伺服器端返回的結果,會一直等待,直到收到結果,然後返回結果。
4.伺服器端接收queue佇列訊息,呼叫函數將訊息進行處理,獲取裴波那契數列。
5.然後伺服器端進行釋出,將訊息傳送到使用者端的回撥函數佇列,使用者端的uuid。
6.使用者端監聽接收佇列訊息,呼叫函數處理,判斷唯一uuid,確認body,然後成功收到訊息並返回。
import pika
# 指定使用者名稱和密碼,如果rabbitmq沒有設定使用者名稱和密碼,可以不指定
credentials = pika.PlainCredentials("admin", "admin")
# host指定rabbitmq伺服器ip地址,credentials指定使用者名稱和密碼
connection = pika.BlockingConnection(pika.ConnectionParameters('47.101.159.222',
credentials=credentials))
# 在連線上建立一個頻道,這個頻道就是我們要通過它來傳送,接收訊息的物件,類似於TCP中的socket物件,我們通過它來收發訊息,宣告佇列
channel = connection.channel()
channel.queue_declare(queue='rpc_queue') # 宣告佇列,如果佇列不存在,會自動建立
def fib(n):
if n == 0:
return 0
elif n == 1:
return 1
else: # 遞迴呼叫,計算斐波那契數列
return fib(n-1) + fib(n-2)
def on_request(ch, method, props, body): # ch為頻道,method為方法,props為屬性,body為訊息體
n = int(body)
print(" [.] fib(%s)" % n)
response = fib(n)
ch.basic_publish(exchange='',
routing_key=props.reply_to, # 將訊息傳送到使用者端的回撥函數
properties=pika.BasicProperties(correlation_id = \
props.correlation_id), # 將使用者端的correlation_id傳遞給使用者端
body=str(response))
# 傳送ack訊息,告訴rabbitmq,訊息已經被處理
ch.basic_ack(delivery_tag=method.delivery_tag)
# 每次只接收一個訊息
channel.basic_qos(prefetch_count=1)
# 消費訊息
channel.basic_consume(queue='rpc_queue', on_message_callback=on_request) # queue為佇列名,on_message_callback為回撥函數,收到訊息後,會呼叫回撥函數
print(" [x] Awaiting RPC requests")
channel.start_consuming() # 開始接收訊息,進入阻塞狀態,等待訊息,直到收到訊息為止,收到訊息後,會呼叫on_request函數
import pika
import uuid
class FibonacciRpcClient(object):
def __init__(self):
# 指定使用者名稱和密碼,如果rabbitmq沒有設定使用者名稱和密碼,可以不指定
self.credentials = pika.PlainCredentials("admin", "admin")
# host指定rabbitmq伺服器ip地址,credentials指定使用者名稱和密碼
self.connection = pika.BlockingConnection(pika.ConnectionParameters('47.101.159.222', credentials=self.credentials))
self.channel = self.connection.channel()
# 宣告一個隨機佇列,用來接收rpc_server返回的結果
result = self.channel.queue_declare(queue='', exclusive=True) # exclusive=True表示這個佇列只能被當前連線使用,當連線關閉時,佇列會被刪除,exclusive=True是為了防止多個使用者端同時使用一個佇列
# 獲取隨機佇列的名稱
self.callback_queue = result.method.queue
self.channel.basic_consume(
queue=self.callback_queue,
on_message_callback=self.on_response, # 消費訊息
auto_ack=True # 自動傳送ack訊息,告訴rabbitmq,訊息已經被處理
)
def on_response(self, ch, method, props, body):
if self.corr_id == props.correlation_id:
self.response = body
def call(self, n):
self.response = None
# 生成一個隨機的correlation_id, 用來標識訊息, 使用者端和伺服器端都會用這個id來標識訊息,
# 使用者端會將這個id傳遞給伺服器端, 伺服器端會將這個id傳遞給使用者端, 使用者端和伺服器端都會將這個id與自己的id進行比較, 如果不一致, 則丟棄這個訊息
self.corr_id = str(uuid.uuid4())
self.channel.basic_publish(
exchange='',
routing_key='rpc_queue', # 將訊息傳送到rpc_queue佇列
properties=pika.BasicProperties( # 訊息屬性, 用來標識訊息
reply_to=self.callback_queue, # 將訊息傳送到使用者端的回撥函數, 用來接收伺服器端返回的結果, 伺服器端會將結果傳送到這個佇列
correlation_id=self.corr_id, # 將使用者端的crrelation_id傳送給伺服器端
),
body=str(n) # 將訊息傳送給伺服器端, 伺服器端會將這個訊息作為引數傳遞給fib函數
)
while self.response is None: # 如果沒有收到伺服器端返回的結果, 則一直等待, 直到收到結果, 然後返回結果
self.connection.process_data_events() # 非阻塞版的start_consuming(), 用來接收訊息
return int(self.response)
fibonacci_rpc = FibonacciRpcClient() # 範例化使用者端, 用來傳送訊息, 並接收伺服器端返回的結果, 並返回結果, 用來呼叫伺服器端的方法
print(" [x] Requesting fib(30)")
response = fibonacci_rpc.call(10) # 呼叫call方法, 傳送訊息, 並接收伺服器端返回的結果, 然後列印結果
print(" [.] Got %r" % response)
十二:python中的rpc框架
python自帶的: SimpleXMLRPCServer(封包大,資料慢) - HTTP通訊
第三方: ZeroRPC(底層使用ZeroMQ和MessagePack,速度快,響應時間短,並行高),grpc(谷歌推出支援誇語言) - TCP通訊
十三:SimpleXMLRPCServer
from xmlrpc.server import SimpleXMLRPCServer
class RPCServer(object):
def __init__(self):
# 初始化父類別,python3中不需要,python2中需要,否則會報錯
super(RPCServer, self).__init__()
print(self)
self.send_data = {'server:' + str(i): i for i in range(100)}
self.recv_data = None
def getObj(self): # 接收資料
return self.send_data
def sendObj(self, data): # 傳送資料
print('send data')
self.recv_data = data
print(self.recv_data)
# 建立一個伺服器,監聽本機的8000埠,並設定允許存取的ip地址,如果不設定,預設只能本機存取
server = SimpleXMLRPCServer(('localhost', 4242), allow_none=True)
# 註冊內省函數,可以檢視伺服器提供的方法,不註冊的話,使用者端只能呼叫register_function註冊的方法
# 為什麼要註冊內省函數呢?因為使用者端呼叫方法時,是通過方法名來呼叫的,如果不註冊內省函數,使用者端就不知道伺服器提供了哪些方法
server.register_introspection_functions()
# 註冊範例,可以呼叫範例的方法,不註冊的話,使用者端只能呼叫register_function註冊的方法
server.register_instance(RPCServer())
# 開始監聽請求,進入阻塞狀態,等待請求,直到收到請求為止,收到請求後,會呼叫註冊的方法
server.serve_forever()
import time
from xmlrpc.client import ServerProxy
def xmlrpc_client():
print('xmlrpc client start')
# 建立一個伺服器代理,指定伺服器的ip地址和埠
c = ServerProxy('http://localhost:4242')
# 呼叫伺服器的方法
data = {'client:' + str(i): i for i in range(100)}
start = time.clock() # 計時
for i in range(5): # 重複呼叫50次
a = c.getObj() # 呼叫伺服器的方法
print(a)
for i in range(5): # 重複呼叫50次
c.sendObj(data) # 呼叫伺服器的方法
print('xmlrpc total time %s' % (time.clock() - start))
if __name__ == '__main__':
xmlrpc_client()
十四:ZeroRPC實現rpc
import zerorpc
class RPCServer(object):
def __init__(self):
print(self)
self.send_data = {'server:'+str(i): i for i in range(100)}
self.recv_data = None
def getObj(self):
print('get data')
return self.send_data
def sendObj(self, data):
print('send data')
self.recv_data = data
print(self.recv_data)
# 建立一個伺服器,監聽本機的8000埠,並設定允許存取的ip地址,如果不設定,預設只能本機存取
s = zerorpc.Server(RPCServer())
# 繫結埠,並設定允許存取的ip地址,如果不設定,預設只能本機存取
s.bind('tcp://0.0.0.0:4243')
# 開始監聽請求,進入阻塞狀態,等待請求,直到收到請求為止,收到請求後,會呼叫註冊的方法
s.run()
import zerorpc
class RPCServer(object):
def __init__(self):
print(self)
self.send_data = {'server:'+str(i): i for i in range(100)}
self.recv_data = None
def getObj(self):
print('get data')
return self.send_data
def sendObj(self, data):
print('send data')
self.recv_data = data
print(self.recv_data)
# 建立一個伺服器,監聽本機的8000埠,並設定允許存取的ip地址,如果不設定,預設只能本機存取
s = zerorpc.Server(RPCServer())
# 繫結埠,並設定允許存取的ip地址,如果不設定,預設只能本機存取
s.bind('tcp://0.0.0.0:4243')
# 開始監聽請求,進入阻塞狀態,等待請求,直到收到請求為止,收到請求後,會呼叫註冊的方法
s.run()
十五:什麼是RPC?
1.RPC介紹?
RPC 是指遠端過程呼叫,也就是說兩臺伺服器,A 和 B,一個應用部署在A 伺服器上,想要呼叫B 伺服器上應用提供的函數或方法,由於不在一個記憶體空間,不能直接呼叫,需要通過網路來表達呼叫的語句和傳達呼叫的資料。
2.RPC是如何呼叫的?
1.要解決通訊的問題,主要是通過在使用者端和伺服器之間建立TCP連線,遠端過程呼叫的所有互動的資料都在這個連線裡傳輸。連線可以是按需連線,呼叫結束後就斷掉,也可以是長連線,多個遠端呼叫共用同一個連線。
2.要解決定址的問題,也就是說,A伺服器上的應用怎麼怎麼告訴底層的 RPC 框架,如何連線到 B 伺服器(如主機或IP地址)以及特定的埠,方法的名稱是什麼,這樣才能完成呼叫。比如基於Wbe服務協定棧的RPC,就要提供一個endpoint URl, 或者是 UDDI服務上查詢。如果是RMI呼叫的話,還需要一個RMI Registry 來註冊服務的地址。
3.當A伺服器上的應用發起遠端過程呼叫時,方法的引數需要通過底層的網路協定如TCP傳輸到B伺服器。由於網路協定是基於二進位制的,記憶體中的引數的值要序列化成二進位制形式,也就是序列化(Serialize) 或編組(marshal),通過定址和傳輸序列化的二進位制傳送給B伺服器。
4.B伺服器收到請求後,需要對引數進行反序列化(序列化的逆操作),恢復為記憶體中的表達方式,然後找到對應的方法(定址的一部分)進行本地呼叫,然後得到返回值。
5.返回值還要傳送回伺服器A上的應用,也要經過序列化的方式傳送,伺服器A接收到後,再反序列化,恢復為記憶體中的表達方式,交給A伺服器上的應用。
3.為什麼要使用RPC?
就是無法在一個程序內,甚至一個計算機內通過本地呼叫的方式完成需求,比如不同的系統間的通訊,甚至不同的組織間的通訊。由於計算能力需要橫向擴充套件,需要在多臺機器組成的叢集上部署應用。
4.常見的RPC框架
功能 |
Hessian |
Montan |
rpcx |
gRPC |
Thrift |
Dubbo |
Dubbox |
Spring Cloud |
開發語言 |
跨語言 |
Java |
Go |
跨語言 |
跨語言 |
Java |
Java |
Java |
分散式(服務治理) |
× |
√ |
√ |
× |
× |
√ |
√ |
√ |
多序列化框架支援 |
hessian |
√(支援Hessian2、Json,可延伸) |
√ |
× 只支援protobuf) |
×(thrift格式) |
√ |
√ |
√ |
多種註冊中心 |
× |
√ |
√ |
× |
× |
√ |
√ |
√ |
管理中心 |
× |
√ |
√ |
× |
× |
√ |
√ |
√ |
跨程式語言 |
√ |
×(支援php client和C server) |
× |
√ |
√ |
× |
× |
× |
支援REST |
× |
× |
× |
× |
× |
× |
√ |
√ |
關注度 |
低 |
中 |
低 |
中 |
中 |
中 |
高 |
中 |
上手難度 |
低 |
低 |
中 |
中 |
中 |
低 |
低 |
中 |
運維成本 |
低 |
中 |
中 |
中 |
低 |
中 |
中 |
中 |
開源機構 |
Caucho |
Weibo |
Apache |
Google |
Apache |
Alibaba |
Dangdang |
Apache |
5.實際的場景中的選擇
# Spring Cloud : Spring全家桶,用起來很舒服,只有你想不到,沒有它做不到。可惜因為釋出的比較晚,國內還沒出現比較成功的案例,大部分都是試水,不過畢竟有Spring作背書,還是比較看好。
# Dubbox: 相對於Dubbo支援了REST,估計是很多公司選擇Dubbox的一個重要原因之一,但如果使用Dubbo的RPC呼叫方式,服務間仍然會存在API強依賴,各有利弊,懂的取捨吧。
# Thrift: 如果你比較高冷,完全可以基於Thrift自己搞一套抽象的自定義框架吧。
# Montan: 可能因為出來的比較晚,目前除了新浪微博16年初發布的,
# Hessian: 如果是初創公司或系統數量還沒有超過5個,推薦選擇這個,畢竟在開發速度、運維成本、上手難度等都是比較輕量、簡單的,即使在以後遷移至SOA,也是無縫遷移。
# rpcx/gRPC: 在服務沒有出現嚴重效能的問題下,或技術棧沒有變更的情況下,可能一直不會引入,即使引入也只是小部分模組優化使用。