CS144 計算機網路 Lab4:TCP Connection

2023-05-06 21:00:21

前言

經過前面幾個實驗的鋪墊,終於到了將他們組合起來的時候了。Lab4 將實現 TCP Connection 功能,內部含有 TCPReceiverTCPSender,可以與 TCP 連線的另一個端點進行資料交換。

實驗要求

簡單來說,這次實驗就是要在 TCPConnection 類中實現下圖所示的有限狀態機:

這些狀態對應 TCPState 的內部列舉類 State

//! \brief Official state names from the [TCP](\ref rfc::rfc793) specification
enum class State {
    LISTEN = 0,   //!< Listening for a peer to connect
    SYN_RCVD,     //!< Got the peer's SYN
    SYN_SENT,     //!< Sent a SYN to initiate a connection
    ESTABLISHED,  //!< Three-way handshake complete
    CLOSE_WAIT,   //!< Remote side has sent a FIN, connection is half-open
    LAST_ACK,     //!< Local side sent a FIN from CLOSE_WAIT, waiting for ACK
    FIN_WAIT_1,   //!< Sent a FIN to the remote side, not yet ACK'd
    FIN_WAIT_2,   //!< Received an ACK for previously-sent FIN
    CLOSING,      //!< Received a FIN just after we sent one
    TIME_WAIT,    //!< Both sides have sent FIN and ACK'd, waiting for 2 MSL
    CLOSED,       //!< A connection that has terminated normally
    RESET,        //!< A connection that terminated abnormally
};

除了三次握手和四次揮手外,我們還得處理報文段首部 RST 標誌被置位的情況,這時候應該將斷開連線,並將內部的輸入流和輸入流標記為 error,此時的 TCPState 應該是 RESET

程式碼實現

先在類宣告裡面加上一些成員:

class TCPConnection {
  private:
    TCPConfig _cfg;
    TCPReceiver _receiver{_cfg.recv_capacity};
    TCPSender _sender{_cfg.send_capacity, _cfg.rt_timeout, _cfg.fixed_isn};

    //! outbound queue of segments that the TCPConnection wants sent
    std::queue<TCPSegment> _segments_out{};

    //! Should the TCPConnection stay active (and keep ACKing)
    //! for 10 * _cfg.rt_timeout milliseconds after both streams have ended,
    //! in case the remote TCPConnection doesn't know we've received its whole stream?
    bool _linger_after_streams_finish{true};

    bool _is_active{true};

    size_t _last_segment_time{0};

    /**
     * @brief 傳送報文段
     * @param fill_window 是否填滿傳送視窗
    */
    void send_segments(bool fill_window = false);

    // 傳送 RST 報文段
    void send_rst_segment();

    // 中止連線
    void abort();

  public:
    // 省略其餘成員
}

接著實現幾個最簡單的成員函數:

size_t TCPConnection::remaining_outbound_capacity() const { return _sender.stream_in().remaining_capacity(); }

size_t TCPConnection::bytes_in_flight() const { return _sender.bytes_in_flight(); }

size_t TCPConnection::unassembled_bytes() const { return _receiver.unassembled_bytes(); }

size_t TCPConnection::time_since_last_segment_received() const { return _last_segment_time; }

bool TCPConnection::active() const { return _is_active; }

主動連線

使用者端可以呼叫 TCPConnection::connect 函數傳送 SYN 報文段請求與伺服器端建立連線,由於 Lab3 中實現的 TCPSender::fill_window() 函數會根據傳送方的狀態選擇要傳送的報文段型別,在還沒建立連線的情況下,這裡直接呼叫 fill_window() 就會將一個 SYN 報文段放在佇列中,我們只需將其取出放到 TCPConnection_segments_out 佇列中即可:

void TCPConnection::connect() {
    // 傳送 SYN
    send_segments(true);
}

void TCPConnection::send_segments(bool fill_window) {
    if (fill_window)
        _sender.fill_window();

    auto &segments = _sender.segments_out();


    while (!segments.empty()) {
        auto seg = segments.front();

        // 設定 ACK、確認應答號和接收視窗大小
        if (_receiver.ackno()) {
            seg.header().ackno = _receiver.ackno().value();
            seg.header().win = _receiver.window_size();
            seg.header().ack = true;
        }

        _segments_out.push(seg);
        segments.pop();
    }
}

主動關閉

當上層程式沒有更多資料需要傳送時,將會呼叫 TCPConnection::end_input_stream() 結束輸入,這時候需要傳送 FIN 報文段給伺服器端,告訴他自己沒有更多資料要傳送了,但是可以繼續接收伺服器端發來的資料。使用者端的狀態由 ESTABLISHED 轉移到 FIN_WAIT_1,伺服器端收到 FIN 之後變成 CLOSE_WAIT 狀態,並回復 ACK 給使用者端,使用者端收到之後接著轉移到 FIN_WAIT_2 狀態。

如果伺服器端資料傳輸完成了,會傳送 FIN 報文段給使用者端,轉移到 LAST_ACK 狀態,此時使用者端會回覆最後一個 ACK 給伺服器端並進入 TIME_WAIT 超時等待狀態,如果這個等待時間內沒有收到伺服器端重傳的 FIN,就說明 ACK 順利到達了伺服器端且伺服器端已經變成 CLOSED 狀態了,此時使用者端也能斷開連線變成 CLOSED 了。

void TCPConnection::end_input_stream() {
    // 傳送 FIN
    _sender.stream_in().end_input();
    send_segments(true);
}

在上述情景中,使用者端是主動關閉(Active Close)的一方,伺服器端是被動關閉(Passive Close)的一方。

主動重置連線

有兩種情況會導致傳送 RST 報文段來主動重置連線:

  • TCPSender 超時重傳的次數過多時,表明通訊鏈路存在故障;
  • TCPConnect 物件被釋放但是 TCP 仍然處於連線狀態的時候;

和 Lab3 中類似,TCPConnection 通過外部定期呼叫 tick() 函數來得知過了多長時間,在 tick() 函數裡還得處理超時等待的情況:

//! \param[in] ms_since_last_tick number of milliseconds since the last call to this method
void TCPConnection::tick(const size_t ms_since_last_tick) {
    _sender.tick(ms_since_last_tick);

    // 重傳次數太多時需要斷開連線
    if (_sender.consecutive_retransmissions() > _cfg.MAX_RETX_ATTEMPTS) {
        return send_rst_segment();
    }

    // 重傳封包
    send_segments();

    _last_segment_time += ms_since_last_tick;

    //  TIME_WAIT 超時等待狀態轉移到 CLOSED 狀態
    if (TCPState::state_summary(_receiver) == TCPReceiverStateSummary::FIN_RECV &&
        TCPState::state_summary(_sender) == TCPSenderStateSummary::FIN_ACKED &&
        _last_segment_time >= 10 * _cfg.rt_timeout) {
        _linger_after_streams_finish = false;
        _is_active = false;
    }
}

TCPConnection::~TCPConnection() {
    try {
        if (active()) {
            cerr << "Warning: Unclean shutdown of TCPConnection\n";

            // Your code here: need to send a RST segment to the peer
            send_rst_segment();
        }
    } catch (const exception &e) {
        std::cerr << "Exception destructing TCP FSM: " << e.what() << std::endl;
    }
}

void TCPConnection::send_rst_segment() {
    abort();
    TCPSegment seg;
    seg.header().rst = true;
    _segments_out.push(seg);
}

void TCPConnection::abort() {
    _is_active = false;
    _sender.stream_in().set_error();
    _receiver.stream_out().set_error();
}

接收報文段

外部通過 TCPConnection::segment_received() 將接收到的報文段傳給它,在這個函數內部,需要將確認應答號和接收視窗大小告訴 TCPSender,好讓他接著填滿傳送視窗。接著還需要把報文段傳給 TCPReceiver 來重組資料,並更新確認應答號和自己的接收視窗大小。然後 TCPSender 需要根據收到的包型別進行狀態轉移,並決定傳送含有有效資料的報文段還是空 ACK 給對方。

為什麼即使沒有新的資料要傳送也要回復一個空 ACK 呢?因為如果不這麼做,對方會以為剛剛發的包丟掉了而一直重傳。

void TCPConnection::segment_received(const TCPSegment &seg) {
    if (!active())
        return;

    _last_segment_time = 0;

    // 是否需要傳送空包回覆 ACK,比如沒有資料的時候收到 SYN/ACK 也要回一個 ACK
    bool need_empty_ack = seg.length_in_sequence_space();

    auto &header = seg.header();

    // 處理 RST 標誌位
    if (header.rst)
        return abort();

    // 將包交給傳送者
    if (header.ack) {
        need_empty_ack |= !_sender.ack_received(header.ackno, header.win);

        // 佇列中已經有資料包文段了就不需要專門的空包回覆 ACK
        if (!_sender.segments_out().empty())
            need_empty_ack = false;
    }

    // 將包交給接受者
    need_empty_ack |= !_receiver.segment_received(seg);

    // 被動連線
    if (TCPState::state_summary(_receiver) == TCPReceiverStateSummary::SYN_RECV &&
        TCPState::state_summary(_sender) == TCPSenderStateSummary::CLOSED)
        return connect();

    // 被動關閉
    if (TCPState::state_summary(_receiver) == TCPReceiverStateSummary::FIN_RECV &&
        TCPState::state_summary(_sender) == TCPSenderStateSummary::SYN_ACKED)
        _linger_after_streams_finish = false;

    // LAST_ACK 狀態轉移到 CLOSED
    if (TCPState::state_summary(_receiver) == TCPReceiverStateSummary::FIN_RECV &&
        TCPState::state_summary(_sender) == TCPSenderStateSummary::FIN_ACKED && !_linger_after_streams_finish) {
        _is_active = false;
        return;
    }

    if (need_empty_ack && TCPState::state_summary(_receiver) != TCPReceiverStateSummary::LISTEN)
        _sender.send_empty_segment();

    // 傳送其餘報文段
    send_segments();
}

測試

在終端中輸入 make check_lab4 就能執行所有測試用例,測試結果如下:

發現有幾個 txrx.sh 的測試用例失敗了,但是單獨執行這些測試用例卻又可以通過,就很奇怪:

接著測試一下吞吐量(請確保構建型別是 Release 而不是 Debug),感覺還行, 0.71Gbit/s,超過了實驗指導書要求的 0.1Gbit/s。但是實際上還可以優化一下 ByteStream 類,將內部資料型別換成 BufferList,這樣在寫入資料的時候就不用一個字元一個字元插入佇列了,可以大大提高效率。

最後將 Lab0 中 webget 使用的 TCPSocket 換成 CS144TCPSocket,重新編譯並執行 webegt,發現能夠正確得到響應結果,說明我們實現的這個 CS144TCPSocket 已經能和別的作業系統實現的 Socket 進行交流了:

後記

至此,CS144 的 TCP 實驗部分已全部完成,可以說是比較有挑戰性的一次實驗了,尤其是 Lab4 部分,各種奇奇怪怪的 bug,編碼一晚上,偵錯時長兩天半(約等於一坤天),偵錯的時候斷點還總是失效,最後發現是優化搞的鬼,需要將 etc/cflags.cmake 第 18 行改為 set (CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -ggdb3 -O0") 才行。以上~~