ROS應用層通訊協定解析

2022-11-19 18:00:22

參考:http://wiki.ros.org/ROS/Master_API

http://wiki.ros.org/ROS/Connection Header

說明

ROS本質上就是一個鬆耦合的通訊框架,通訊模式包括:遠端呼叫(service-client)、訂閱釋出(topic)、持續通訊(action)和全域性引數(引數伺服器),這四種模式基本已經能夠涵蓋百分之九十的應用場景了

本次針對訂閱釋出模式,探究一下ROS通訊中的具體通訊協定,讀完本文後,你可以在不依賴ROS的情況下和ROS通訊

本次通訊採用從機訂閱主機資料,通過wireshark抓包,得到具體xmlrpc協定資料內容,根據xmlrpc協定格式,找到對應程式碼

(因為時間有限,部分協定可能有跳過的地方)

1、registerPublisher

從機建立節點的第一步

這個方法用於註冊一個釋出者的caller

request報文body:

<?xml version="1.0"?>
<methodCall>
    <methodName>registerPublisher</methodName>
    <params>
        <param>
            <value>/test_sub</value>
        </param>
        <param>
            <value>/rosout</value>
        </param>
        <param>
            <value>rosgraph_msgs/Log</value>
        </param>
        <param>
            <value>http://192.168.1.150:40209</value>
        </param>
    </params>
</methodCall>

response報文body:

<?xml version='1.0'?>
<methodResponse>
    <params>
        <param>
            <value>
                <array>
                    <data>
                        <value>
                            <int>1</int>
                        </value>
                        <value>
                            <string>Registered [/test_sub] as publisher of [/rosout]</string>
                        </value>
                        <value>
                            <array>
                                <data>
                                    <value>
                                        <string>http://sherlock:35861/</string>
                                    </value>
                                </data>
                            </array>
                        </value>
                    </data>
                </array>
            </value>
        </param>
    </params>
</methodResponse>

先說結論:

ROS有一個紀錄檔相關的topic,名稱是 /rosout,所有節點的紀錄檔資訊都會通過這個 topic 釋出出來

ROS還有一個紀錄檔相關的節點,名稱是 /rosout,負責訂閱 /rosout資料,然後使用名稱為 /rosout_agg 的topic釋出出來, /rosout_agg 的訂閱者是rqt等偵錯工具

所以,結合上面的xml內容,我們可以大致推斷,建立一個新的節點,那麼這個節點必定會發佈一個topic,就是/rosout,所以上面的XMLRPC協定內容,就是網rosmaster內註冊一個publisher,用於釋出/rosout

整體來說,就是呼叫介面

registerPublisher("/test_sub", "/rosout", "rosgraph_msgs/Log", "http://192.168.1.150:40209")

返回值是:

1
Registered [/test_sub] as publisher of [/rosout]
http://sherlock:35861/

1是固定資料

第二行是message

最後一個返回值表示訂閱者的uri列表,這裡因為只有一個訂閱者,所有隻有一個uri

再看程式碼:

函數宣告如下:

registerPublisher(caller_id, topic, topic_type, caller_api)
	Register the caller as a publisher the topic.
	引數
		caller_id (str)
			ROS caller ID
		topic (str)
			Fully-qualified name of topic to register.
		topic_type (str)
			Datatype for topic. Must be a package-resource name, i.e. the .msg name.
		caller_api (str)
			API URI of publisher to register.
	返回值(int, str, [str])
		(code, statusMessage, subscriberApis)
		List of current subscribers of topic in the form of XMLRPC URIs.

找到 registerPublisher 接介面,位於ros_comm/rosmaster 包中,檔案為:master_api.py(ROS主從機制利用python實現,拿掉python則無法實現主從)

@apivalidate([], ( is_topic('topic'), valid_type_name('topic_type'), is_api('caller_api')))
def registerPublisher(self, caller_id, topic, topic_type, caller_api):
	"""
    Register the caller as a publisher the topic.
    @param caller_id: ROS caller id
    @type  caller_id: str
    @param topic: Fully-qualified name of topic to register.
    @type  topic: str
	@param topic_type: Datatype for topic. Must be a
    package-resource name, i.e. the .msg name.
	@type  topic_type: str
    @param caller_api str: ROS caller XML-RPC API URI
    @type  caller_api: str
    @return: (code, statusMessage, subscriberApis).
    List of current subscribers of topic in the form of XMLRPC URIs.
    @rtype: (int, str, [str])
    """
    #NOTE: we need topic_type for getPublishedTopics.
	try:
        self.ps_lock.acquire()
        self.reg_manager.register_publisher(topic, caller_id, caller_api)
        # don't let '*' type squash valid typing
        if topic_type != rosgraph.names.ANYTYPE or not topic in self.topics_types:
            self.topics_types[topic] = topic_type
        pub_uris = self.publishers.get_apis(topic)
        sub_uris = self.subscribers.get_apis(topic)
        self._notify_topic_subscribers(topic, pub_uris, sub_uris)
        mloginfo("+PUB [%s] %s %s",topic, caller_id, caller_api)
        sub_uris = self.subscribers.get_apis(topic)            
    finally:
    	self.ps_lock.release()
	return 1, "Registered [%s] as publisher of [%s]"%(caller_id, topic), sub_uris

registerPublisher 介面的註釋:Register the caller as a publisher the topic,將呼叫者註冊為一個topic釋出者

可以對應xmlrpc中對應引數,加上猜測:

caller_id:呼叫者,可以認為是節點,/test_sub,從及建立的節點

topic:釋出的topic name,/rosout

topic_type:釋出的topic資料型別,rosgraph_msgs/Log

caller_api:呼叫者釋出資料的API介面,http://192.168.1.150:40209

總上,我們大概有幾點猜測:

  1. 介面在rosmaster中,介面是registerPublisher,表示,這是註冊節點的
  2. 告訴master節點,我建立了一個節點,節點名是/test_sub
  3. 告訴master,這個節點需要釋出topic,topic名是/rosout,資料型別是rosgraph_msgs/Log

registerPublisher 介面中有三個地方需要注意:

  1. register_publisher介面呼叫
  2. _notify_topic_subscribers介面呼叫,告知當前所有的subscriber,有新的publisher,他們需要再次到新的publisher中去訂閱資料
  3. return 內容,最後會拼接成xmlrpc的報文,response 回去,這也就順便解釋了第二條xmlrpc報文(response)

register_publisher

先看 register_publisher,程式碼在rosmaster中的registrations.py檔案中

def register_publisher(self, topic, caller_id, caller_api):
    """
	Register topic publisher
    @return: None
    """
    self._register(self.publishers, topic, caller_id, caller_api)

_register 介面,這個節點做了三件事

  1. 呼叫內部介面儲存節點資訊
  2. 如果這個節點之前已經存在,就表明它是在更新,則釋出資料的介面改變,且之前已經有訂閱,則此時所有訂閱該介面的所有subscriber解除訂閱
  3. 呼叫register介面儲存
def _register(self, r, key, caller_id, caller_api, service_api=None):
    # update node information
    node_ref, changed = self._register_node_api(caller_id, caller_api)
    node_ref.add(r.type, key)
    # update pub/sub/service indicies
    if changed:
        self.publishers.unregister_all(caller_id)
        self.subscribers.unregister_all(caller_id)
        self.services.unregister_all(caller_id)
        self.param_subscribers.unregister_all(caller_id)
	r.register(key, caller_id, caller_api, service_api)

_register_node_api 介面,我們可以看到,它主要做兩件事

  1. 更新master中節點資訊(節點名、節點發布資料的介面)
  2. 檢查這個節點是不是已經存在,如果是,則告訴呼叫者
def _register_node_api(self, caller_id, caller_api):
    """
    @param caller_id: caller_id of provider
    @type  caller_id: str
    @param caller_api: caller_api of provider
    @type  caller_api: str
    @return: (registration_information, changed_registration). changed_registration is true if 
    caller_api is differet than the one registered with caller_id
    @rtype: (NodeRef, bool)
    """
    node_ref = self.nodes.get(caller_id, None)

    bumped_api = None
    if node_ref is not None:
        if node_ref.api == caller_api:
            return node_ref, False
        else:
            bumped_api = node_ref.api
            self.thread_pool.queue_task(bumped_api, shutdown_node_task,
                                        (bumped_api, caller_id, "new node registered with same name"))

    node_ref = NodeRef(caller_id, caller_api)
    self.nodes[caller_id] = node_ref
    return (node_ref, bumped_api != None)

_notify_topic_subscribers

_notify_topic_subscribers 程式碼,根據註釋說明,介面的作用就是通知所有的subscriber,有新的publisher

def _notify_topic_subscribers(self, topic, pub_uris, sub_uris):
    """
    Notify subscribers with new publisher list
    @param topic: name of topic
    @type  topic: str
    @param pub_uris: list of URIs of publishers.
    @type  pub_uris: [str]
    """
    self._notify(self.subscribers, publisher_update_task, topic, pub_uris, sub_uris)

_notify 程式碼,將更新的通知任務(publisher_update_task)放進事件佇列中,等待執行:

def _notify(self, registrations, task, key, value, node_apis):
    """
    Generic implementation of callback notification
    @param registrations: Registrations
    @type  registrations: L{Registrations}
    @param task: task to queue
    @type  task: fn
    @param key: registration key
    @type  key: str
    @param value: value to pass to task
    @type  value: Any
    """
    # cache thread_pool for thread safety
    thread_pool = self.thread_pool
    if not thread_pool:
        return
    
    try:            
        for node_api in node_apis:
            # use the api as a marker so that we limit one thread per subscriber
            thread_pool.queue_task(node_api, task, (node_api, key, value))
    except KeyError:
        _logger.warn('subscriber data stale (key [%s], listener [%s]): node API unknown'%(key, s))

publisher_update_task 程式碼,傳入的三個引數分別是:新節點的介面、topic名稱、訂閱者的介面:

def publisher_update_task(api, topic, pub_uris):
    """
    Contact api.publisherUpdate with specified parameters
    @param api: XML-RPC URI of node to contact
    @type  api: str
    @param topic: Topic name to send to node
    @type  topic: str
    @param pub_uris: list of publisher APIs to send to node
    @type  pub_uris: [str]
    """
    msg = "publisherUpdate[%s] -> %s %s" % (topic, api, pub_uris)
    mloginfo(msg)
    start_sec = time.time()
    try:
        #TODO: check return value for errors so we can unsubscribe if stale
        ret = xmlrpcapi(api).publisherUpdate('/master', topic, pub_uris)
        msg_suffix = "result=%s" % ret
    except Exception as ex:
        msg_suffix = "exception=%s" % ex
        raise
    finally:
        delta_sec = time.time() - start_sec
        mloginfo("%s: sec=%0.2f, %s", msg, delta_sec, msg_suffix)

publisherUpdate 介面在 rospy 模組的 masterslave.py 檔案中,猜測是使用XMLRPC協定,通知所有的訂閱者節點,釋出者更新了

@apivalidate(-1, (is_topic('topic'), is_publishers_list('publishers')))
def publisherUpdate(self, caller_id, topic, publishers):
    """
    Callback from master of current publisher list for specified topic.
    @param caller_id: ROS caller id
    @type  caller_id: str
    @param topic str: topic name
    @type  topic: str
    @param publishers: list of current publishers for topic in the form of XMLRPC URIs
    @type  publishers: [str]
    @return: [code, status, ignore]
    @rtype: [int, str, int]
    """
    if self.reg_man:
        for uri in publishers:
            self.reg_man.publisher_update(topic, publishers)
    return 1, "", 0

2、hasParam

request報文body

<?xml version='1.0'?>
<methodResponse>
    <params>
        <param>
            <value>
                <array>
                    <data>
                        <value>
                            <int>1</int>
                        </value>
                        <value>
                            <string>/use_sim_time</string>
                        </value>
                        <value>
                            <boolean>0</boolean>
                        </value>
                    </data>
                </array>
            </value>
        </param>
    </params>
</methodResponse>

response報文body

<?xml version="1.0"?>
<methodCall>
    <methodName>requestTopic</methodName>
    <params>
        <param>
            <value>/rosout</value>
        </param>
        <param>
            <value>/rosout</value>
        </param>
        <param>
            <value>
                <array>
                    <data>
                        <value>
                            <array>
                                <data>
                                    <value>TCPROS</value>
                                </data>
                            </array>
                        </value>
                    </data>
                </array>
            </value>
        </param>
    </params>
</methodCall>

先說結論:

呼叫 hasParam 介面,檢查引數伺服器是否有引數 /test_sub/use_sim_time

返回值是 [1, /use_sim_time, 0],表示沒有

再看程式碼:

hasParam

有了上面的分析經驗,我們可以很輕鬆地的出結論,這是在呼叫 hasParam 介面

我們可以很輕鬆地找到這個方法的程式碼,在rosmaster模組的master_api.py檔案中

@apivalidate(False, (non_empty_str('key'),))
def hasParam(self, caller_id, key):
    """
    Check if parameter is stored on server. 
    @param caller_id str: ROS caller id
    @type  caller_id: str
    @param key: parameter to check
    @type  key: str
    @return: [code, statusMessage, hasParam]
    @rtype: [int, str, bool]
    """
    key = resolve_name(key, caller_id)
    if self.param_server.has_param(key):
        return 1, key, True
    else:
        return 1, key, False 

根據協定

  1. caller_id 傳參是 /test_sub
  2. key 傳參是 /use_sim_time

根據註釋和程式碼,我們可以確認,這個就口就是在檢查,引數伺服器是否有引數 /test_sub/use_sim_time

resolve_name 介面接收兩個引數,根據呼叫:

  1. name是/use_sim_time
  2. namespace_是/test_sub

所以,才能確認上面的全域性引數 /test_sub/use_sim_time

hasParam 介面的返回值有三個

  1. code,整型,這裡無論有沒有,都返回1,可以忽略
  2. key,這裡就是 /use_sim_time
  3. hasParam,表示是否有這個引數,True/False

根據 response 報文,這裡應該返回 [1, /use_sim_time, 0],表示沒有這個引數

use_sim_time

想要理解為什麼要呼叫這個介面,就要理解 use_sim_time 引數的作用

use_sim_time是一個重要的引數,它預設值為false,可以配合Rosbag使用,是一個很重要的離線偵錯工具

我們都知道,ROS 中的時間有兩種:

  1. ROS::Time()
  2. ROS::WallTime()

ROS::Time()和ROS::WallTime()

表示ROS網路中的時間,如果當時在非模擬環境裡執行,那它就是當前的時間。但是假設去回放當時的情況,那就需要把當時的時間錄下來

以控制為例,很多的資料處理需要知道當時某一個時刻發生了什麼。Wall Time可以理解為牆上時間,牆上掛著的時間沒有人改變的了,永遠在往前走;ROS Time可以被人為修改,你可以暫停它,可以加速,可以減速,但是Wall Time不可以。

在開啟一個Node之前,當把use_sim_time設定為true時,這個節點會從clock Topic獲得時間。所以操作這個clock的釋出者,可以實現一個讓Node中得到ROS Time暫停、加速、減速的效果。同時下面這些方面都是跟Node透明的,所以非常適合離線的偵錯方式。當把ROSbag記下來以後重新play出來時,加兩個橫槓,--clock,它就會發布出這個訊息

3、registerSubscriber

先看報文:

request報文body

<?xml version="1.0"?>
<methodCall>
    <methodName>registerSubscriber</methodName>
    <params>
        <param>
            <value>/test_sub</value>
        </param>
        <param>
            <value>/ros_message</value>
        </param>
        <param>
            <value>my_package/MessageDefine</value>
        </param>
        <param>
            <value>http://192.168.1.150:43597</value>
        </param>
    </params>
</methodCall>

response報文body

<?xml version='1.0'?>
<methodResponse>
    <params>
        <param>
            <value>
                <array>
                    <data>
                        <value>
                            <int>1</int>
                        </value>
                        <value>
                            <string>Subscribed to [/ros_message]</string>
                        </value>
                        <value>
                            <array>
                                <data>
                                    <value>
                                        <string>http://sherlock:41689/</string>
                                    </value>
                                </data>
                            </array>
                        </value>
                    </data>
                </array>
            </value>
        </param>
    </params>
</methodResponse>

registerSubscriber

registerSubscriber 程式碼在rosmaster包中的master_api.py檔案中,如下:

@apivalidate([], ( is_topic('topic'), valid_type_name('topic_type'), is_api('caller_api')))
def registerSubscriber(self, caller_id, topic, topic_type, caller_api):
    """
    Subscribe the caller to the specified topic. In addition to receiving
    a list of current publishers, the subscriber will also receive notifications
    of new publishers via the publisherUpdate API.        
    @param caller_id: ROS caller id
    @type  caller_id: str
    @param topic str: Fully-qualified name of topic to subscribe to. 
    @param topic_type: Datatype for topic. Must be a package-resource name, i.e. the .msg name.
    @type  topic_type: str
    @param caller_api: XML-RPC URI of caller node for new publisher notifications
    @type  caller_api: str
    @return: (code, message, publishers). Publishers is a list of XMLRPC API URIs
       for nodes currently publishing the specified topic.
    @rtype: (int, str, [str])
    """
    #NOTE: subscribers do not get to set topic type
    try:
        self.ps_lock.acquire()
        self.reg_manager.register_subscriber(topic, caller_id, caller_api)

        # ROS 1.1: subscriber can now set type if it is not already set
        #  - don't let '*' type squash valid typing
        if not topic in self.topics_types and topic_type != rosgraph.names.ANYTYPE:
            self.topics_types[topic] = topic_type

        mloginfo("+SUB [%s] %s %s",topic, caller_id, caller_api)
        pub_uris = self.publishers.get_apis(topic)
    finally:
        self.ps_lock.release()
    return 1, "Subscribed to [%s]"%topic, pub_uris

根據協定往來,我們可以看到呼叫過程

registerSubscriber("/test_sub", "/ros_message", "my_package/MessageDefine", "http://192.168.1.150:43597")

入參有4個:

  1. 訂閱節點名:/test_sub
  2. 需要訂閱的topic 名稱:/ros_message
  3. topic的資料型別:my_package/MessageDefine
  4. 訂閱節點自己的uri,即釋出者通知時的傳送目標

返回值有3個:

  1. code,這裡固定是1

  2. message,這裡是 Subscribed to [/ros_message]

  3. publisher 的訂閱URI 列表,因為這裡只有一個publisher,所以只有一個 http://sherlock:41689/,首先,這可能是主機裡面某個幾點的uri,需要從機去訂閱

程式碼說明:

和第一條,註冊publisher相反,這裡是註冊subscriber

有幾個關鍵程式碼

register_subscriber 程式碼,位於rosmaster包中的registerations.py檔案中:

def register_subscriber(self, topic, caller_id, caller_api):
    """
    Register topic subscriber
    @return: None
    """
    self._register(self.subscribers, topic, caller_id, caller_api)

_register 程式碼,呼叫 _register_node_api 介面更新節點資訊,如果之前有該節點的註冊資訊,則先刪除:

def _register(self, r, key, caller_id, caller_api, service_api=None):
    # update node information
    node_ref, changed = self._register_node_api(caller_id, caller_api)
    node_ref.add(r.type, key)
    # update pub/sub/service indicies
    if changed:
        self.publishers.unregister_all(caller_id)
        self.subscribers.unregister_all(caller_id)
        self.services.unregister_all(caller_id)
        self.param_subscribers.unregister_all(caller_id)
    r.register(key, caller_id, caller_api, service_api)

_register_node_api 程式碼:

def _register_node_api(self, caller_id, caller_api):
    """
    @param caller_id: caller_id of provider
    @type  caller_id: str
    @param caller_api: caller_api of provider
    @type  caller_api: str
    @return: (registration_information, changed_registration). changed_registration is true if 
    caller_api is differet than the one registered with caller_id
    @rtype: (NodeRef, bool)
    """
    node_ref = self.nodes.get(caller_id, None)

    bumped_api = None
    if node_ref is not None:
        if node_ref.api == caller_api:
            return node_ref, False
        else:
            bumped_api = node_ref.api
            self.thread_pool.queue_task(bumped_api, shutdown_node_task,
                                        (bumped_api, caller_id, "new node registered with same name"))

    node_ref = NodeRef(caller_id, caller_api)
    self.nodes[caller_id] = node_ref
    return (node_ref, bumped_api != None)

shutdown_node_task 程式碼,如果訂閱節點退出了,則需要通知:

def shutdown_node_task(api, caller_id, reason):
    """
    Method to shutdown another ROS node. Generally invoked within a
    separate thread as this is used to cleanup hung nodes.
    
    @param api: XML-RPC API of node to shutdown
    @type  api: str
    @param caller_id: name of node being shutdown
    @type  caller_id: str
    @param reason: human-readable reason why node is being shutdown
    @type  reason: str
    """
    try:
        xmlrpcapi(api).shutdown('/master', "[{}] Reason: {}".format(caller_id, reason))
    except:
        pass #expected in many common cases
    remove_server_proxy(api)

4、requestTopic

request報文body,如下,我們可以看到,xmlrpc傳送的host是sherlock:41689,是上一步驟,收到的publisher的uri,如果有多個publisher,要request多次

POST /RPC2 HTTP/1.1
Host: sherlock:41689
User-Agent: Go-http-client/1.1
Content-Length: 307
Content-Type: text/xml
Accept-Encoding: gzip

<?xml version="1.0"?>
<methodCall>
    <methodName>requestTopic</methodName>
    <params>
        <param>
            <value>/test_sub</value>
        </param>
        <param>
            <value>/ros_message</value>
        </param>
        <param>
            <value>
                <array>
                    <data>
                        <value>
                            <array>
                                <data>
                                    <value>TCPROS</value>
                                </data>
                            </array>
                        </value>
                    </data>
                </array>
            </value>
        </param>
    </params>
</methodCall>

response報文

<?xml version="1.0"?>
<methodResponse>
    <params>
        <param>
            <value>
                <array>
                    <data>
                        <value>
                            <i4>1</i4>
                        </value>
                        <value></value>
                        <value>
                            <array>
                                <data>
                                    <value>TCPROS</value>
                                    <value>sherlock</value>
                                    <value>
                                        <i4>33173</i4>
                                    </value>
                                </data>
                            </array>
                        </value>
                    </data>
                </array>
            </value>
        </param>
    </params>
</methodResponse>

根據協定,呼叫過程如下:

requestTopic("/test_sub", "/ros_message", ["TCPROS"])

告訴publisher,subscriber準備好了,可以發資料了

requestTopic

_remap_table['requestTopic'] = [0] # remap topic 
@apivalidate([], (is_topic('topic'), non_empty('protocols')))
def requestTopic(self, caller_id, topic, protocols):
    """
    Publisher node API method called by a subscriber node.

    Request that source allocate a channel for communication. Subscriber provides
    a list of desired protocols for communication. Publisher returns the
    selected protocol along with any additional params required for
    establishing connection. For example, for a TCP/IP-based connection,
    the source node may return a port number of TCP/IP server. 
    @param caller_id str: ROS caller id    
    @type  caller_id: str
    @param topic: topic name
    @type  topic: str
    @param protocols: list of desired
     protocols for communication in order of preference. Each
     protocol is a list of the form [ProtocolName,
     ProtocolParam1, ProtocolParam2...N]
    @type  protocols: [[str, XmlRpcLegalValue*]]
    @return: [code, msg, protocolParams]. protocolParams may be an
    empty list if there are no compatible protocols.
    @rtype: [int, str, [str, XmlRpcLegalValue*]]
    """
    if not get_topic_manager().has_publication(topic):
        return -1, "Not a publisher of [%s]"%topic, []
    for protocol in protocols: #simple for now: select first implementation 
        protocol_id = protocol[0]
        for h in self.protocol_handlers:
            if h.supports(protocol_id):
                _logger.debug("requestTopic[%s]: choosing protocol %s", topic, protocol_id)
                return h.init_publisher(topic, protocol)
    return 0, "no supported protocol implementations", []

init_publisher 程式碼

def init_publisher(self, resolved_name, protocol):
    """
    Initialize this node to receive an inbound TCP connection,
    i.e. startup a TCP server if one is not already running.
    
    @param resolved_name: topic name
    @type  resolved__name: str
    
    @param protocol: negotiated protocol
      parameters. protocol[0] must be the string 'TCPROS'
    @type  protocol: [str, value*]
    @return: (code, msg, [TCPROS, addr, port])
    @rtype: (int, str, list)
    """
    if protocol[0] != TCPROS:
        return 0, "Internal error: protocol does not match TCPROS: %s"%protocol, []
    start_tcpros_server()
    addr, port = get_tcpros_server_address()
    return 1, "ready on %s:%s"%(addr, port), [TCPROS, addr, port]

publisher 檢查,是否支援指定協定,如果不支援,則返回1,否則返回0

返回值的第二個引數有三個值,分別是協定型別、ip地址 和 埠

5、unregisterSubscriber

request報文body

<?xml version="1.0"?>
<methodCall>
    <methodName>unregisterSubscriber</methodName>
    <params>
        <param>
            <value>/test_sub</value>
        </param>
        <param>
            <value>/ros_message</value>
        </param>
        <param>
            <value>http://192.168.1.150:43597</value>
        </param>
    </params>
</methodCall>

response報文body

<?xml version='1.0'?>
<methodResponse>
    <params>
        <param>
            <value>
                <array>
                    <data>
                        <value>
                            <int>1</int>
                        </value>
                        <value>
                            <string>Unregistered [/test_sub] as provider of [/ros_message]</string>
                        </value>
                        <value>
                            <int>1</int>
                        </value>
                    </data>
                </array>
            </value>
        </param>
    </params>
</methodResponse>

unregisterSubscriber

@apivalidate(0, (is_topic('topic'), is_api('caller_api')))
def unregisterSubscriber(self, caller_id, topic, caller_api):
    """
    Unregister the caller as a subscriber of the topic.
    @param caller_id: ROS caller id
    @type  caller_id: str
    @param topic: Fully-qualified name of topic to unregister.
    @type  topic: str
    @param caller_api: API URI of service to unregister. Unregistration will only occur if current
       registration matches.    
    @type  caller_api: str
    @return: (code, statusMessage, numUnsubscribed). 
      If numUnsubscribed is zero it means that the caller was not registered as a subscriber.
      The call still succeeds as the intended final state is reached.
    @rtype: (int, str, int)
    """
    try:
        self.ps_lock.acquire()
        retval = self.reg_manager.unregister_subscriber(topic, caller_id, caller_api)
        mloginfo("-SUB [%s] %s %s",topic, caller_id, caller_api)
        return retval
    finally:
        self.ps_lock.release()

取消訂閱,固定返回1

6、TCP資料私有協定

首先保證主從機的資料型別一致,包括欄位的順序,實際ROS框架內是通過md5檢測,保證資料型別一致的

資料傳輸前提:

  1. 資料型別一致
  2. 欄位名一致
  3. 欄位順序一致

資料傳輸模式:小端hex

封包結構

資料域長度 資料域
4 byte n byte

資料域長度固定4byte,長度不包括自身

資料域

資料域根據欄位型別解析,ros 通訊的內建資料型別有:

原始型別 位元組數
bool 1
int8 1
uint8 1
int16 2
uint16 2
int32 4
uint32 4
int64 8
uint64 8
float32 4
float64 8
string n(n > 4)
time 8
duration 8
陣列 n(n > 4)

其中,除 string、time、duration陣列 型別外的其餘型別,直接根據位元組數讀取即可

string

字串型別,也可認為是字元陣列(則可以和陣列型別複用),因為是不定長度,所以需要知道字串的長度,ROS中使用uint32型別表示長度/陣列元素數量,即4byte

所以,如果出現字串型別,則資料域為:

字串長度 字元
4 byte n byte

陣列

陣列型別,因為是不定長度,所以需要知道陣列的元素數量,和string同理,ROS 中使用uint32型別表示陣列的元素數量,再結合陣列元素的型別,即可得到總長度

所以,出現陣列型別,則資料域為:

陣列元素數量 陣列資料
4 byte n byte

如果陣列型別是 int32,則陣列資料佔 4 * n byte,其餘型別以此類推

time

ROS 中把 time 單獨提取作為基本資料型別,對應 ROS 中的 ros::Time 類,因為我們可以認為是巢狀型別

ros::Time 有兩個欄位:

  1. sec: uint32
  2. nsec: uint32

所以,time 型別在資料域佔8byte,如果出現 time 型別,則資料域為:

sec nsec
4 byte 4 byte

duration

duration 型別和 time 相同,在 ROS 中對應 ros::Duration 類,可以認為是巢狀型別

ros::Duration 有兩個欄位:

  1. sec: uint32
  2. nsec: uint32

所以,duration 型別在資料域中佔8byte,如果出現 duration 型別,則資料域為:

sec nsec
4 byte 4 byte

巢狀型別

巢狀型別可以認為是資料域的組合,如果發現欄位型別不是內建資料型別,則可認為是巢狀型別,巢狀型別按照型別的欄位,遞迴處理即可

協定分析範例

範例1:

.msg 檔案為:

int8 shutdown_time
string text

主機發出資料為:

shutdown_time = 123
text = abc

從機收到資料為:

08 00 00 00 7b 03 00 00 00 61 62 63

分析如下:

  1. 包頭4 byte表示資料與長度

    08 00 00 00,表示資料域長度為8,即後續資料總長度為8

  2. 欄位1為shutdown_time,型別是int8,1byte

    7b轉10進位制,為123

  3. 欄位2為text,型別是字串 (4+n)byte

    4byte 長度:03 00 00 00,表示字串長度為3,後面3byte 為字串內容:61 62 63,ASCII轉換為:abc)

範例2:

.msg 檔案為:

Header header
int8 shutdown_time
int32 shutdown_time2
string text
float32 num
string text2
int8[] data
int16[] data2

Header的資料型別為:

uint32 seq
time stamp
string frame_id

主機發出資料為:

//header一般由ROS系統自己處理,這裡寫出來是為了方便觀察
header.seq = 29;
header.time.sec = 0;
header.time.nsec = 0;
header.frame_id = "";
shutdown_time = 123;
shutdown_time2 = 987654;
text2 = "lmn";
text = "abc";
num = 23.4;
data = [1, 2, 4, 89];
data2 = [11, 22, 908]

從機收到的資料為:

39 00 00 00 1d 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 7b 06 12 0f 00 03 00 00 00 61 62 63 33 33 bb 41 03 00 00 00 6c 6d 6e 04 00 00 00 01 02 04 59 03 00 00 00 0b 00 16 00 8c 03

分析如下:

  1. 包頭4byte表示資料域長度

    0x39 00 00 00,10進位制57,表明後續資料域長度57byte

  2. 欄位1,Header 型別,可以認為是巢狀型別,Header欄位如下:

    1. 欄位1,seq,uint32,4byte,資料為,0x1d 00 00 00,十進位制29;
    2. 欄位2,time型別,可以認為是巢狀型別,欄位如下:
      1. 欄位1,sec,uint32,4byte,資料為:0x00 00 00 00,十進位制0;
      2. 欄位2,nsec,uint32,4byte,資料為:0x00 00 00 00,十進位制0;
    3. 欄位3,frame_id,字串型別,4byte 表示長度,00 00 00 00,表示長度為0,字串為空
  3. 欄位2,shutdown_time,int8,1byte,資料為:0x7b,十進位制123;

  4. 欄位3,shutdown_time2,int32,4byte,資料為:0x06 12 0f 00,十進位制:987654;

  5. 欄位4,text,字串:

    1. 4byte 長度,資料為:0x03 00 00 00 ,表示字元產長度為3;
    2. 字串內容,資料為:0x61 62 63 ,ASCII對應:abc;
  6. 欄位5,num,flota32,4byte,資料為:33 33 bb 41,十進位制:23.4;

  7. 欄位6:text2,字串:

    1. 4byte長度,資料為:0x03 00 00 00,表示字串長度為3;
    2. 字元產內容,資料為:0x6c 6d 6e,ASCII對應lmn;
  8. 欄位7,data,int8陣列:

    1. 4byte表示陣列元素數量,資料為:0x04 00 00 00,表示有4個int8元素:
    2. 陣列內容:[0x01, 0x02, 0x04, 0x59,],表示:[1,2,4,89];
  9. 欄位8,data2,int16陣列:

    1. 4byte表示長度,資料為:0x03 00 00 00,表示有3個int16資料;
    2. 陣列內容:[0x0b00, 0x1600, 0x8c03],表示:[11, 22, 908]

7、小結

宗上,整體的從機訂閱時序圖如下: