Fabric v2.2原始碼分析 Broadcast廣播交易服務(二)

2020-11-13 18:00:45

1 Broadcast Client建立流程

在這裡插入圖片描述
Peer節點呼叫GetBroadcastClientFnc函數來獲取Broadcast服務使用者端。使用者端提供了Send(*common.Envelope) 用於傳送交易訊息請求。

  • 根據oderer的設定建立ordererclient使用者端
  • 呼叫NewConnection建立gRPC連線物件conn
  • 呼叫NewAtomicBroadcastClient請求呼叫Broadcast服務介面,建立服務使用者端(atomicBroadcastBroadcastClient)型別
type AtomicBroadcastClient interface {
	// broadcast receives a reply of Acknowledgement for each common.Envelope in order, indicating success or type of failure
	Broadcast(ctx context.Context, opts ...grpc.CallOption) (AtomicBroadcast_BroadcastClient, error)
	// deliver first requires an Envelope of type DELIVER_SEEK_INFO with Payload data as a mashaled SeekInfo message, then a stream of block replies is received.
	Deliver(ctx context.Context, opts ...grpc.CallOption) (AtomicBroadcast_DeliverClient, error)
}

type atomicBroadcastClient struct {
	cc *grpc.ClientConn
}

func NewAtomicBroadcastClient(cc *grpc.ClientConn) AtomicBroadcastClient {
	return &atomicBroadcastClient{cc}
}

func (c *atomicBroadcastClient) Broadcast(ctx context.Context, opts ...grpc.CallOption) (AtomicBroadcast_BroadcastClient, error) {
	stream, err := c.cc.NewStream(ctx, &_AtomicBroadcast_serviceDesc.Streams[0], "/orderer.AtomicBroadcast/Broadcast", opts...)
	if err != nil {
		return nil, err
	}
	x := &atomicBroadcastBroadcastClient{stream}
	return x, nil
}

type AtomicBroadcast_BroadcastClient interface {
	Send(*common.Envelope) error
	Recv() (*BroadcastResponse, error)
	grpc.ClientStream
}

type atomicBroadcastBroadcastClient struct {
	grpc.ClientStream
}

2 Broadcast服務訊息處理

Orederer節點啟動時已經在本地gRPC伺服器上註冊了Orderer排序伺服器,並建立了Broadcast服務處理控制程式碼,當用戶端呼叫Broadcast()發起服務請求時,會呼叫s.bh.Handle方法處理請求,通過訊息控制程式碼呼叫Handle方法,通過伺服器端srv呼叫srv.Recv(),監聽並接收Send介面傳送的交易訊息請求。

func (s *server) Broadcast(srv ab.AtomicBroadcast_BroadcastServer) error {
	logger.Debugf("Starting new Broadcast handler")
	defer func() {
		if r := recover(); r != nil {
			logger.Criticalf("Broadcast client triggered panic: %s\n%s", r, debug.Stack())
		}
		logger.Debugf("Closing Broadcast stream")
	}()
	return s.bh.Handle(&broadcastMsgTracer{
		AtomicBroadcast_BroadcastServer: srv,
		msgTracer: msgTracer{
			debug:    s.debug,
			function: "Broadcast",
		},
	})
}

func (bh *Handler) Handle(srv ab.AtomicBroadcast_BroadcastServer) error {
	addr := util.ExtractRemoteAddress(srv.Context())
	logger.Debugf("Starting new broadcast loop for %s", addr)
	for {
		msg, err := srv.Recv() //阻塞等待新的交易請求
		......
		resp := bh.ProcessMessage(msg, addr)
		err = srv.Send(resp)   //傳送成功處理狀態響應訊息
		if resp.Status != cb.Status_SUCCESS {
			return err
		}
		......
	}

}

ProcessMessage中的關鍵函數:

  • BroadcastChannelSupport解析出訊息的通道頭部chdr,設定交易訊息標誌位isConfig、鏈支援物件processor(ChainSupport型別)
  • processor.WaitReady檢查當前通道共識元件鏈是否已經準備好接收新訊息
  • processor.ProcessNormalMsg處理普通交易訊息
  • processor.Order重新設定普通交易訊息(包含configSeq最新設定序號),交給共識鏈物件請求排序出塊
  • processor.ProcessConfigUpdateMsg處理設定交易訊息
  • processor.Configure構造新的設定交易訊息,交給共識鏈物件請求處理
func (bh *Handler) ProcessMessage(msg *cb.Envelope, addr string) (resp *ab.BroadcastResponse) {
	tracker := &MetricsTracker{
		ChannelID: "unknown",
		TxType:    "unknown",
		Metrics:   bh.Metrics,
	}
	defer func() {
		// This looks a little unnecessary, but if done directly as
		// a defer, resp gets the (always nil) current state of resp
		// and not the return value
		tracker.Record(resp)
	}()
	tracker.BeginValidate()

	chdr, isConfig, processor, err := bh.SupportRegistrar.BroadcastChannelSupport(msg)
	if chdr != nil {
		tracker.ChannelID = chdr.ChannelId
		tracker.TxType = cb.HeaderType(chdr.Type).String()
	}

	if !isConfig {
		logger.Debugf("[channel: %s] Broadcast is processing normal message from %s with txid '%s' of type %s", chdr.ChannelId, addr, chdr.TxId, cb.HeaderType_name[chdr.Type])
		configSeq, err := processor.ProcessNormalMsg(msg)
		......
		err = processor.Order(msg, configSeq)
		......
	} else { // isConfig
		logger.Debugf("[channel: %s] Broadcast is processing config update message from %s", chdr.ChannelId, addr)
		config, configSeq, err := processor.ProcessConfigUpdateMsg(msg)
		......
		err = processor.Configure(config, configSeq)
		......
	}

	logger.Debugf("[channel: %s] Broadcast has successfully enqueued message of type %s from %s", chdr.ChannelId, cb.HeaderType_name[chdr.Type], addr)

	return &ab.BroadcastResponse{Status: cb.Status_SUCCESS}
}

在這裡插入圖片描述

2.1 普通交易處理

呼叫StandardChannel的ProcessNormalMsg方法處理普通交易資訊

func (s *StandardChannel) ProcessNormalMsg(env *cb.Envelope) (configSeq uint64, err error) {
	oc, ok := s.support.OrdererConfig()
	if !ok {
		logger.Panicf("Missing orderer config")
	}
	if oc.Capabilities().ConsensusTypeMigration() {
		if oc.ConsensusState() != orderer.ConsensusType_STATE_NORMAL {
			return 0, errors.WithMessage(
				ErrMaintenanceMode, "normal transactions are rejected")
		}
	}

	configSeq = s.support.Sequence()
	err = s.filters.Apply(env)
	return
}
  • 呼叫s.support.Sequence獲取通道設定序號configSeq,預設值為0,新建應用通道設定序號增1,該設定序號可以用來標識通道設定資訊的版本
  • 呼叫 s.filters.Apply,利用自帶的通道資訊過濾器過濾該訊息,檢查是否滿足應用通道上的訊息處理請求

這裡共有四個過濾器:

  • EmptyRejectRule空檢測
  • MaxBytesRule最大位元組數檢測
  • SigFilter訊息簽名驗證(Channel-Writes通道寫許可權)
  • expirationRejectRule 拒絕過期的簽名證書
func (c *Chain) Order(env *common.Envelope, configSeq uint64) error {
	c.Metrics.NormalProposalsReceived.Add(1)
	return c.Submit(&orderer.SubmitRequest{LastValidationSeq: configSeq, Payload: env, Channel: c.channelID}, 0)
}

func (c *Chain) Submit(req *orderer.SubmitRequest, sender uint64) error {
	if err := c.isRunning(); err != nil {
		c.Metrics.ProposalFailures.Add(1)
		return err
	}

	leadC := make(chan uint64, 1)
	select {
	case c.submitC <- &submit{req, leadC}:
		lead := <-leadC
		if lead == raft.None {
			c.Metrics.ProposalFailures.Add(1)
			return errors.Errorf("no Raft leader")
		}

		if lead != c.raftID {
			if err := c.rpc.SendSubmit(lead, req); err != nil {
				c.Metrics.ProposalFailures.Add(1)
				return err
			}
		}

	case <-c.doneC:
		c.Metrics.ProposalFailures.Add(1)
		return errors.Errorf("chain is stopped")
	}

	return nil
}

Submit 首先將請求訊息封裝為 submit 結構通過當前 Chain範例的通道 c.submitC 傳遞給後端處理(下一節分析如何處理),同時獲取當前時刻 raft 叢集的 leader 資訊。

這裡對 leader 的不同狀態進行了不同處理:

  • lead == raft.None:即當前叢集中還沒有選出一個 leader,那麼說明共識功能暫時可不用,所以直接返回 error-「no Raft leader」;
  • lead != c.raftID:即當前節點不是 raft 叢集的 leader,非 leader 不進行訊息處理,所以通過 rpc.SendSubmit 方法將訊息轉發給目標 leader;
  • lead == c.raftID:這是一個的隱含情況,即當前節點為 leader 的情況,那自然是針對請求訊息進行處理,由接收 submitC 通道訊息的部分處理。

也就是說,所有的應用端傳送給 orderer 的 broadcast 請求報文,都會被轉發給 raft 叢集中的 leader 節點進行處理,如果沒有 leader 則返回錯誤資訊。

2.2 設定交易訊息

func (s *SystemChannel) ProcessConfigUpdateMsg(envConfigUpdate *cb.Envelope) (config *cb.Envelope, configSeq uint64, err error) {
	channelID, err := protoutil.ChannelID(envConfigUpdate)
	if err != nil {
		return nil, 0, err
	}

	logger.Debugf("Processing config update tx with system channel message processor for channel ID %s", channelID)

	if channelID == s.support.ChannelID() {
		return s.StandardChannel.ProcessConfigUpdateMsg(envConfigUpdate)
	}

	// XXX we should check that the signature on the outer envelope is at least valid for some MSP in the system channel

	logger.Debugf("Processing channel create tx for channel %s on system channel %s", channelID, s.support.ChannelID())

	// If the channel ID does not match the system channel, then this must be a channel creation transaction

	bundle, err := s.templator.NewChannelConfig(envConfigUpdate)
	if err != nil {
		return nil, 0, err
	}

	newChannelConfigEnv, err := bundle.ConfigtxValidator().ProposeConfigUpdate(envConfigUpdate)
	if err != nil {
		return nil, 0, errors.WithMessagef(err, "error validating channel creation transaction for new channel '%s', could not successfully apply update to template configuration", channelID)
	}

	newChannelEnvConfig, err := protoutil.CreateSignedEnvelope(cb.HeaderType_CONFIG, channelID, s.support.Signer(), newChannelConfigEnv, msgVersion, epoch)
	if err != nil {
		return nil, 0, err
	}

	wrappedOrdererTransaction, err := protoutil.CreateSignedEnvelope(cb.HeaderType_ORDERER_TRANSACTION, s.support.ChannelID(), s.support.Signer(), newChannelEnvConfig, msgVersion, epoch)
	if err != nil {
		return nil, 0, err
	}

	// We re-apply the filters here, especially for the size filter, to ensure that the transaction we
	// just constructed is not too large for our consenter.  It additionally reapplies the signature
	// check, which although not strictly necessary, is a good sanity check, in case the orderer
	// has not been configured with the right cert material.  The additional overhead of the signature
	// check is negligible, as this is the channel creation path and not the normal path.
	err = s.StandardChannel.filters.Apply(wrappedOrdererTransaction)
	if err != nil {
		return nil, 0, err
	}

	return wrappedOrdererTransaction, s.support.Sequence(), nil
}
  • protoutil.ChannelID獲取訊息中的通道ID
  • 如果訊息ID與當前訊息通道ID一致,交給標準通道處理器處理StandardChannel.ProcessConfigUpdateMsg
  • NewChannelConfig建立新的應用通道
  • ProposeConfigUpdate構造新的通道交易設定資訊(ConfigEnvelope型別)
  • CreateSignedEnvelope分別建立HeaderType_CONFIG和HeaderType_ORDERER_TRANSACTION的設定交易資訊
  • StandardChannel.filters.Apply系統通道的訊息過濾器
  • 呼叫s.support.Sequence獲取通道設定序號configSeq
func (s *StandardChannel) ProcessConfigUpdateMsg(env *cb.Envelope) (config *cb.Envelope, configSeq uint64, err error) {
	logger.Debugf("Processing config update message for existing channel %s", s.support.ChannelID())

	// Call Sequence first.  If seq advances between proposal and acceptance, this is okay, and will cause reprocessing
	// however, if Sequence is called last, then a success could be falsely attributed to a newer configSeq
	seq := s.support.Sequence()
	err = s.filters.Apply(env)
	......
	configEnvelope, err := s.support.ProposeConfigUpdate(env)
	......
	config, err = protoutil.CreateSignedEnvelope(cb.HeaderType_CONFIG, s.support.ChannelID(), s.support.Signer(), configEnvelope, msgVersion, epoch)
	......
	err = s.filters.Apply(config)
   ......
	err = s.maintenanceFilter.Apply(config)
	return config, seq, nil
}