CS144 計算機網路 Lab3:TCP Sender

2023-04-30 18:00:55

前言

在 Lab2 中我們實現了 TCP Receiver,負責在收到報文段之後將資料寫入重組器中,並回復給傳送方確認應答號。在 Lab3 中,我們將實現 TCP 連線的另一個端點——傳送方,負責讀取 ByteStream(由傳送方上層應用程式建立並寫入資料),並將位元組流轉換為報文段傳送給接收方。

程式碼實現

TCP Sender 將負責:

  • 跟蹤 TCP Receiver 的視窗,處理確認應答號和視窗大小
  • 通過從 ByteStream 中讀取內容來填充傳送視窗,建立新的報文段(可以包含 SYN 和 FIN 標誌),並行送它們
  • 跟蹤哪些分段已傳送但尚未被接收方確認——我們稱之為未完成報文段(outstanding segment)
  • 如果傳送報文段後經過足夠長的時間仍未得到確認,則重新傳送未完成的報文段

由於涉及到超時處理,我們可以先實現一個簡單的定時器 Timer,類宣告如下所示:

class Timer {
  private:
    uint32_t _rto;          // 超時時間
    uint32_t _remain_time;	// 剩餘時間
    bool _is_running;		// 是否在執行

  public:
    Timer(uint32_t rto);

    // 啟動計時器
    void start();

    // 停止計時器
    void stop();

    // 是否超時
    bool is_time_out();

    // 設定過去了多少時間
    void elapse(size_t eplased);

    // 設定超時時間
    void set_time_out(uint32_t duration);
};

根據實驗指導書的要求,定時器不能通過呼叫系統時間函數來知道過了多長時間,而是由外部傳入的時長引數告知,這一點可以從 send_retx.cc 測試用例得到印證:

TCPSenderTestHarness test{"Retx SYN twice at the right times, then ack", cfg};
test.execute(ExpectSegment{}.with_no_flags().with_syn(true).with_payload_size(0).with_seqno(isn));
test.execute(ExpectNoSegment{});
test.execute(ExpectState{TCPSenderStateSummary::SYN_SENT});

// 外部指定逝去的時間
test.execute(Tick{retx_timeout - 1u});

所以這個定時器的實現就很簡單,外部通過呼叫 Timer::elapse() 告知定時器多久過去了,定時器只要更新一下剩餘時長就好了:


Timer::Timer(uint32_t rto) : _rto(rto), _remain_time(rto), _is_running(false) {}

void Timer::start() {
    _is_running = true;
    _remain_time = _rto;
}

void Timer::stop() { _is_running = false; }

bool Timer::is_time_out() { return _remain_time == 0; }

void Timer::elapse(size_t elapsed) {
    if (elapsed > _remain_time) {
        _remain_time = 0;
    } else {
        _remain_time -= elapsed;
    }
}

void Timer::set_time_out(uint32_t duration) {
    _rto = duration;
    _remain_time = duration;
}

完成定時器之後,來看看 TCPSender 類有哪些成員:

class TCPSender {
  private:
    //! our initial sequence number, the number for our SYN.
    WrappingInt32 _isn;

    //! outbound queue of segments that the TCPSender wants sent
    std::queue<TCPSegment> _segments_out{};

    // 未被確認的報文段
    std::queue<std::pair<TCPSegment, uint64_t>> _outstand_segments{};

    //! retransmission timer for the connection
    unsigned int _initial_retransmission_timeout;

    //! outgoing stream of bytes that have not yet been sent
    ByteStream _stream;

    //! the (absolute) sequence number for the next byte to be sent
    uint64_t _next_seqno{0};

    // ackno checkpoint
    uint64_t _ack_seq{0};

    // 連續重傳次數
    uint32_t _consecutive_retxs{0};

    // 未被確認的序號長度
    uint64_t _outstand_bytes{0};

    // 接收方視窗長度
    uint16_t _window_size{1};

    // 是否同步
    bool _is_syned{false};

    // 是否結束
    bool _is_fin{false};

    // 計時器
    Timer _timer;

  public:
    //! Initialize a TCPSender
    TCPSender(const size_t capacity = TCPConfig::DEFAULT_CAPACITY,
              const uint16_t retx_timeout = TCPConfig::TIMEOUT_DFLT,
              const std::optional<WrappingInt32> fixed_isn = {});

    //! \name "Input" interface for the writer
    ByteStream &stream_in() { return _stream; }
    const ByteStream &stream_in() const { return _stream; }

    //! \brief A new acknowledgment was received
    bool ack_received(const WrappingInt32 ackno, const uint16_t window_size);

    //! \brief Generate an empty-payload segment (useful for creating empty ACK segments)
    void send_empty_segment();

    // 傳送報文段
    void send_segment(std::string &&data, bool syn = false, bool fin = false);

    //! \brief create and send segments to fill as much of the window as possible
    void fill_window();

    //! \brief Notifies the TCPSender of the passage of time
    void tick(const size_t ms_since_last_tick);

    //! \brief How many sequence numbers are occupied by segments sent but not yet acknowledged?
    size_t bytes_in_flight() const;

    //! \brief Number of consecutive retransmissions that have occurred in a row
    unsigned int consecutive_retransmissions() const;

    //! \brief TCPSegments that the TCPSender has enqueued for transmission.
    std::queue<TCPSegment> &segments_out() { return _segments_out; }

    //! \brief absolute seqno for the next byte to be sent
    uint64_t next_seqno_absolute() const { return _next_seqno; }

    //! \brief relative seqno for the next byte to be sent
    WrappingInt32 next_seqno() const { return wrap(_next_seqno, _isn); }
};

可以看到,我們 TCPSender 有以下主要成員:

  • queue<TCPSegment> _segments_out:待傳送的報文段佇列,外部程式會從這個佇列裡面取出報文段並行送出去

  • queue<pair<TCPSegment, uint64_t>> _outstand_segments:存放未被確認的報文段和它對應的絕對序列號的佇列

  • uint64_t _ack_seq:上一次收到的絕對確認應答號

  • uint32_t _consecutive_retxs最早傳送的但是未被確認的報文段的重傳次數,用於更新超時時間

  • uint64_t _outstand_bytes:所有未被確認的報文段所佔序列號空間長度,SYN 和 FIN 也要佔用一個序號

  • uint16_t _window_size:接收方視窗大小,初始值為 1,由於沒有實現加性遞增乘性遞減(AIMD)擁塞控制機制,所以不用維護傳送方的擁塞視窗大小,直接維護接收方視窗大小

  • bool _is_syned:是否成功同步

  • bool _is_fin:是否關閉連線

  • Timer _timer:定時器

先來實現一些比較簡單的函數:

//! \param[in] capacity the capacity of the outgoing byte stream
//! \param[in] retx_timeout the initial amount of time to wait before retransmitting the oldest outstanding segment
//! \param[in] fixed_isn the Initial Sequence Number to use, if set (otherwise uses a random ISN)
TCPSender::TCPSender(const size_t capacity, const uint16_t retx_timeout, const std::optional<WrappingInt32> fixed_isn)
    : _isn(fixed_isn.value_or(WrappingInt32{random_device()()}))
    , _initial_retransmission_timeout{retx_timeout}
    , _stream(capacity)
    , _timer(retx_timeout) {}

uint64_t TCPSender::bytes_in_flight() const { return _outstand_bytes; }

unsigned int TCPSender::consecutive_retransmissions() const { return _consecutive_retxs; }

丟包處理

根據實驗指導書中的描述:

Periodically, the owner of the TCPSender will call the TCPSender’s tick method, indicating the passage of time.

外部會定期呼叫 TCPSender::tick() 函數來告知它過了多長時間,TCPSender 要根據傳入的時間判斷最早傳送的包是不是超時未被確認,如果是(定時器溢位),就說明這個包丟掉了,需要重傳。

同時超時也意味著網路可能比較擁擠,沿途的某個路由器內部佇列滿了,再次傳送也有可能丟失,不僅浪費了頻寬,還會進一步加劇網路的擁堵。不如耐心點,把超時時間翻倍,如果下一次成功收到確認應答號就還原成初始超時時間。這個超時時間估計機制和 CS144 第 61 集和《計算機網路:自頂而下方法》第 158 頁所講授的指數移動平均機制不太一樣:

值得注意的是,實驗指導書中只將超時作為重傳的條件,而沒有考慮三次冗餘 ACK 觸發快速重傳情況。因此 Timer::tick() 的程式碼實現如下:

//! \param[in] ms_since_last_tick the number of milliseconds since the last call to this method
void TCPSender::tick(const size_t ms_since_last_tick) {
    // 更新定時器
    _timer.elapse(ms_since_last_tick);
    if (!_timer.is_time_out())
        return;

    // 超時需要重發第一個報文段,同時將超時時間翻倍
    _segments_out.push(_outstand_segments.front().first);

    _consecutive_retxs += 1;
    _timer.set_time_out(_initial_retransmission_timeout * (1 << _consecutive_retxs));
    _timer.start();
}

這裡只重傳了一個報文段,而不是像回退 N 步(GBN)協定那樣重傳整個視窗內的報文段,這是因為 Lab2 中實現的接收方會快取所有亂序到達的報文段,而 GBN 是直接將其丟棄掉了。如果我們重傳的包被成功接收了,並且使接收方成功重組了整個傳送視窗內的資料,就不需要重傳後續的報文段了。如果沒有成功重組,仍有部分資料缺失,接收方會回覆一個它想要的報文段的序號,到時候重傳這個報文段就行了。

傳送報文段

傳送方需要根據接收方的確認應答號和視窗大小決定需要傳送哪些資料,假設當前資料接收情況如下圖所示,綠色和藍色的部分是已成功接收並重組的資料,紅色部分是成功接收但是因為前方有報文沒達到而未重組的資料:

假設最後一個紅色矩形就是上次傳送的最後一個報文段,那麼 TCPSender 的各個成員的值就是圖中所標註的那樣,這時候呼叫 TCPSender::fill_window() 傳送的應該是 _next_seq ~ _ack_seq + _window_size 之間的資料。不過在傳送資料之前需要完成三次握手,所以需要先判斷 _is_syned 是否為 true,如果為 false 就需要傳送 SYN 包與接收端進行連線。所有資料都傳送完成之後需要傳送一個 FIN 報文段(可以攜帶最後一批資料或者不懈攜帶任何資料)說明 TCPSender 已經沒有新資料要傳送了,可以斷開連線了。


void TCPSender::fill_window() {
    if (!_is_syned) {
        // 等待 SYN 超時
        if (!_outstand_segments.empty())
            return;

        // 傳送一個 SYN 包
        send_segment("", true);
    } else {
        size_t remain_size = max(_window_size, static_cast<uint16_t>(1)) + _ack_seq - _next_seqno;

        // 當緩衝區中有待傳送資料時就傳送資料包文段
        while (remain_size > 0 && !_stream.buffer_empty()) {
            auto ws = min(min(remain_size, TCPConfig::MAX_PAYLOAD_SIZE), _stream.buffer_size());
            remain_size -= ws;

            string &&data = _stream.peek_output(ws);
            _stream.pop_output(ws);

            // 置位 FIN
            _is_fin |= (_stream.eof() && !_is_fin && remain_size > 0);
            send_segment(std::move(data), false, _is_fin);
        }

        // 緩衝區輸入結束時傳送 FIN(緩衝區為空時不會進入迴圈體,需要再次傳送)
        if (_stream.eof() && !_is_fin && remain_size > 0) {
            _is_fin = true;
            send_segment("", false, true);
        }
    }
}


void TCPSender::send_segment(string &&data, bool syn, bool fin) {
    // 建立報文段
    TCPSegment segment;
    segment.header().syn = syn;
    segment.header().fin = fin;
    segment.header().seqno = next_seqno();
    segment.payload() = std::move(data);

    // 將報文段放到傳送佇列中
    _segments_out.push(segment);
    _outstand_segments.push({segment, _next_seqno});

    // 更新序號
    auto len = segment.length_in_sequence_space();
    _outstand_bytes += len;
    _next_seqno += len;
}


void TCPSender::send_empty_segment() {
    TCPSegment seg;
    seg.header().seqno = next_seqno();
    _segments_out.push(seg);
}

這裡有一個地方值得思考的問題是:把同一個報文段儲存到兩個佇列中不會導致資料的拷貝嗎?實際上不會,因為 TCPSegment::_payload 的資料型別是 Buffer,它的宣告如下所示:

//! \brief A reference-counted read-only string that can discard bytes from the front
class Buffer {
  private:
    std::shared_ptr<std::string> _storage{};
    size_t _starting_offset{};

  public:
    Buffer() = default;

    //! \brief Construct by taking ownership of a string
    Buffer(std::string &&str) noexcept : _storage(std::make_shared<std::string>(std::move(str))) {}

    //! \name Expose contents as a std::string_view
    std::string_view str() const {
        if (not _storage) {
            return {};
        }
        return {_storage->data() + _starting_offset, _storage->size() - _starting_offset};
    }

    operator std::string_view() const { return str(); }

    //! \brief Get character at location `n`
    uint8_t at(const size_t n) const { return str().at(n); }

    //! \brief Size of the string
    size_t size() const { return str().size(); }

    //! \brief Make a copy to a new std::string
    std::string copy() const { return std::string(str()); }

    //! \brief Discard the first `n` bytes of the string (does not require a copy or move)
    //! \note Doesn't free any memory until the whole string has been discarded in all copies of the Buffer.
    void remove_prefix(const size_t n);
};

可以看到 Buffer 內部使用智慧指標 shared_ptr<string> _storage 共用了同一份字串,當 queue.push(buffer) 的時候呼叫了 Buffer(const Buffer &) 拷貝建構函式,只對 _storage 指標進行賦值而不涉及字串複製操作。同時 Buffer(string &&str) 建構函式接受右值,可以直接把傳入的字串偷取過來,無需拷貝,效率是很高的。

確認應答號處理

當傳送方收到確認應答號時,需要判斷這個應答號是否合法,如果收到的確認引導號落在傳送視窗以外,就不去管它。否則需要重置超時時間為初始值,並移除 _outstand_segments 佇列中絕對序列號小於絕對確認應答號的報文段。如果不存在未確認的報文段了就關閉定時器,否則得再次啟動定時器,為重傳下一個報文段做準備。

//! \param ackno The remote receiver's ackno (acknowledgment number)
//! \param window_size The remote receiver's advertised window size
//! \returns `false` if the ackno appears invalid (acknowledges something the TCPSender hasn't sent yet)
bool TCPSender::ack_received(const WrappingInt32 ackno, const uint16_t window_size) {
    auto ack_seq = unwrap(ackno, _isn, _ack_seq);

    if (ack_seq == 0)
        return true;

    // absolute ackno 不能落在視窗外
    if (_is_syned && ack_seq > _next_seqno)
        return false;

    _is_syned = true;
    _window_size = window_size;
    _ack_seq = ack_seq;

    // 重置超時時間為初始值
    _timer.set_time_out(_initial_retransmission_timeout);
    _consecutive_retxs = 0;

    // 移除已被確認的報文段
    while (!_outstand_segments.empty()) {
        auto &[segment, seqno] = _outstand_segments.front();
        if (seqno >= ack_seq)
            break;

        _outstand_bytes -= segment.length_in_sequence_space();
        _outstand_segments.pop();
    }

    // 再次填滿傳送視窗
    fill_window();

    // 如果還有沒被確認的報文段就重啟計時器
    if (!_outstand_segments.empty())
        _timer.start();
    else
        _timer.stop();

    return true;
}

測試

在命令列中輸入下述程式碼就能編譯並測試所有與傳送方有關的測試用例:

cd build
make -j8
ctest -R send_

測試結果如下,發現全部成功通過了:

總結

相比於 Lab2,Lab3 的難度更高,因為實驗指導書的說明並不是很充分,很多東西還是靠複習課本和偵錯測試用例搞明白的,不過有一說一,CS144 的測試用例寫的是真的好,程式碼很整潔,也用了建造者模式等設計模式,還是很值得學習的。期待最後一個實驗 Lab4,以上~~