在介紹之前先說一些概念:裡面有兩個結構很重要,一個是flow一個是miniflow這裡介紹一下他們的資料結構和建構函式。
struct miniflow { struct flowmap map; }; struct flowmap { map_t bits[FLOWMAP_UNITS]; };
//flow是8位元組對齊的,除8得到flow中包含8位元組的個數 #define FLOW_U64S (sizeof(struct flow) / sizeof(uint64_t)) //map大小為8位元組,MAP_T_BITS 為64位元 typedef unsigned long long map_t; #define MAP_T_BITS (sizeof(map_t) * CHAR_BIT) //每位表示一個u64,FLOWMAP_UNITS 表示最少需要幾個64位元 #define FLOWMAP_UNITS DIV_ROUND_UP(FLOW_U64S, MAP_T_BITS) struct flowmap { map_t bits[FLOWMAP_UNITS]; }; struct miniflow { struct flowmap map; /* Followed by: * uint64_t values[n]; * where 'n' is miniflow_n_values(miniflow). */ }; struct netdev_flow_key { uint32_t hash; uint32_t len; struct miniflow mf; // bits uint64_t buf[FLOW_MAX_PACKET_U64S]; // 就是上邊所說的value }; // 有些欄位是互斥的 #define FLOW_MAX_PACKET_U64S (FLOW_U64S \ /* Unused in datapath */ - FLOW_U64_SIZE(regs) \ - FLOW_U64_SIZE(metadata) \ /* L2.5/3 */ - FLOW_U64_SIZE(nw_src) /* incl. nw_dst */ \ - FLOW_U64_SIZE(mpls_lse) \ /* L4 */ - FLOW_U64_SIZE(tp_src) \ )
void miniflow_extract(struct dp_packet *packet, struct miniflow *dst) { ... // 初始化賦值有兩個關鍵,一個是這個values: return (uint64_t *)(mf + 1); // 就是上邊說的 uint64_t *values = miniflow_values(dst); struct mf_ctx mf = { FLOWMAP_EMPTY_INITIALIZER, values, values + FLOW_U64S }; ... if (md->skb_priority || md->pkt_mark) { miniflow_push_uint32(mf, skb_priority, md->skb_priority); miniflow_push_uint32(mf, pkt_mark, md->pkt_mark); } miniflow_push_be16(mf, dl_type, dl_type); miniflow_pad_to_64(mf, dl_type); ... // 去取網路層資訊,從這裡可以看出,ovs暫時只支援IP,IPV6,ARP,RARP報文 if (OVS_LIKELY(dl_type == htons(ETH_TYPE_IP))){...} else if ... // 提取傳輸層,從這裡可以看出,ovs暫時支援傳輸層協定有TCP,UDP,SCTP,ICMP,ICMPV6 if (OVS_LIKELY(nw_proto == IPPROTO_TCP)){...} else if ...
#define miniflow_push_uint32(MF, FIELD, VALUE) \ miniflow_push_uint32_(MF, offsetof(struct flow, FIELD), VALUE) #define miniflow_push_uint32_(MF, OFS, VALUE) \ { \ MINIFLOW_ASSERT(MF.data < MF.end); \ \ //成員變數位於起始位置,需要呼叫miniflow_set_map設定對應的bit為1 if ((OFS) % 8 == 0) { \ miniflow_set_map(MF, OFS / 8); \ *(uint32_t *)MF.data = VALUE; \ } else if ((OFS) % 8 == 4) { \ //成員變數不在起始位置,要判斷此變數所在的bit為1 miniflow_assert_in_map(MF, OFS / 8); \ *((uint32_t *)MF.data + 1) = VALUE; \ MF.data++; \ } \ }
/* Initializes 'dst' as a copy of 'src'. */ void miniflow_expand(const struct miniflow *src, struct flow *dst) { memset(dst, 0, sizeof *dst); flow_union_with_miniflow(dst, src); } /* Perform a bitwise OR of miniflow 'src' flow data with the equivalent * fields in 'dst', storing the result in 'dst'. */ static inline void flow_union_with_miniflow(struct flow *dst, const struct miniflow *src) { flow_union_with_miniflow_subset(dst, src, src->map); } static inline void flow_union_with_miniflow_subset(struct flow *dst, const struct miniflow *src, struct flowmap subset) { uint64_t *dst_u64 = (uint64_t *) dst; const uint64_t *p = miniflow_get_values(src); map_t map; //遍歷所有的map FLOWMAP_FOR_EACH_MAP (map, subset) { size_t idx; //遍歷map中所有的非0bit MAP_FOR_EACH_INDEX(idx, map) { dst_u64[idx] |= *p++; } dst_u64 += MAP_T_BITS; } }
static void dp_netdev_input__(struct dp_netdev_pmd_thread *pmd, struct dp_packet_batch *packets, bool md_is_valid, odp_port_t port_no) { #if !defined(__CHECKER__) && !defined(_WIN32) const size_t PKT_ARRAY_SIZE = dp_packet_batch_size(packets); #else /* Sparse or MSVC doesn't like variable length array. */ enum { PKT_ARRAY_SIZE = NETDEV_MAX_BURST }; #endif OVS_ALIGNED_VAR(CACHE_LINE_SIZE) struct netdev_flow_key keys[PKT_ARRAY_SIZE]; struct netdev_flow_key *missed_keys[PKT_ARRAY_SIZE]; struct packet_batch_per_flow batches[PKT_ARRAY_SIZE]; size_t n_batches; struct dp_packet_flow_map flow_map[PKT_ARRAY_SIZE]; uint8_t index_map[PKT_ARRAY_SIZE]; size_t n_flows, i; odp_port_t in_port; n_batches = 0; // 1. dfc_processing之後會把miss的放到packets裡 // 找到的可能已經batched了,或者放到flow_map裡了 // flow_map裡是未bathed的,可能直接是*flow或者是NULL,是NULL再去下一層cache查 dfc_processing(pmd, packets, keys, missed_keys, batches, &n_batches, flow_map, &n_flows, index_map, md_is_valid, port_no); // 2. 如果有miss的,再去找fast-path,也就是查dpcls if (!dp_packet_batch_is_empty(packets)) { in_port = packets->packets[0]->md.in_port.odp_port; fast_path_processing(pmd, packets, missed_keys, flow_map, index_map, in_port); } /* Batch rest of packets which are in flow map. */ for (i = 0; i < n_flows; i++) { struct dp_packet_flow_map *map = &flow_map[i]; if (OVS_UNLIKELY(!map->flow)) { continue; } dp_netdev_queue_batches(map->packet, map->flow, map->tcp_flags, batches, &n_batches); } for (i = 0; i < n_batches; i++) { batches[i].flow->batch = NULL; } // 執行每個packet的action for (i = 0; i < n_batches; i++) { packet_batch_per_flow_execute(&batches[i], pmd); } }
static inline size_t dfc_processing(struct dp_netdev_pmd_thread *pmd, struct dp_packet_batch *packets_, struct netdev_flow_key *keys, struct netdev_flow_key **missed_keys, struct packet_batch_per_flow batches[], size_t *n_batches, struct dp_packet_flow_map *flow_map, size_t *n_flows, uint8_t *index_map, bool md_is_valid, odp_port_t port_no) { struct netdev_flow_key *key = &keys[0]; size_t n_missed = 0, n_emc_hit = 0; struct dfc_cache *cache = &pmd->flow_cache; struct dp_packet *packet; size_t cnt = dp_packet_batch_size(packets_); // emc的插入概率,如果為0,表示不開啟emc uint32_t cur_min = pmd->ctx.emc_insert_min; int i; uint16_t tcp_flags; bool smc_enable_db; // 記錄未batched的個數 size_t map_cnt = 0; // 這個變數用於保序 bool batch_enable = true; // 獲取smc是否開啟引數 atomic_read_relaxed(&pmd->dp->smc_enable_db, &smc_enable_db); pmd_perf_update_counter(&pmd->perf_stats, md_is_valid ? PMD_STAT_RECIRC : PMD_STAT_RECV, cnt); do_dfc_hook(pmd, packets_, batches, n_batches); cnt = dp_packet_batch_size(packets_); // 逐個對dp_packet_batch中的每一個packet進行處理 DP_PACKET_BATCH_REFILL_FOR_EACH (i, cnt, packet, packets_) { struct dp_netdev_flow *flow; // 若packet包長小於以太頭的長度直接丟包 if (OVS_UNLIKELY(dp_packet_size(packet) < ETH_HEADER_LEN)) { dp_packet_delete(packet); COVERAGE_INC(datapath_drop_rx_invalid_packet); continue; } // 對資料手工預取可減少讀取延遲,從而提高效能 if (i != cnt - 1) { struct dp_packet **packets = packets_->packets; /* Prefetch next packet data and metadata. */ OVS_PREFETCH(dp_packet_data(packets[i+1])); pkt_metadata_prefetch_init(&packets[i+1]->md); } // 初始化metadata首先將pkt_metadata中flow_in_port前的位元組全部設為0 // 將in_port.odp_port設為port_no, tunnel.ipv6_dst設為in6addr_any if (!md_is_valid) { pkt_metadata_init(&packet->md, port_no); } // 報文轉化為miniflow, 上文有講 miniflow_extract(packet, &key->mf); key->len = 0; /* Not computed yet. */ // 計算當前報文miniflow的hash值 key->hash = (md_is_valid == false) ? dpif_netdev_packet_get_rss_hash_orig_pkt(packet, &key->mf) : dpif_netdev_packet_get_rss_hash(packet, &key->mf); // 根據key->hash,emc_entry alive,miniflow 3個條件得到dp_netdev_flow // cur_min = 0,表示不可能插入,後面有講什麼時候才會插入EMC flow = (cur_min != 0) ? emc_lookup(&cache->emc_cache, key) : NULL; if (OVS_LIKELY(flow)) { tcp_flags = miniflow_get_tcp_flags(&key->mf); n_emc_hit++; // 命中次數+1 // 為了保證報文的順序,所有的packet對應的flow都用flow_map儲存 // flow_map裡面就是packet數量對應的(packet,flow,tcp_flag) // 最後會把這些在dp_netdev_input__裡重新把順序合併一下 if (OVS_LIKELY(batch_enable)) { // 把查到的flow加到batches裡第n_batches個batch裡 dp_netdev_queue_batches(packet, flow, tcp_flags, batches, n_batches); } else { packet_enqueue_to_flow_map(packet, flow, tcp_flags, flow_map, map_cnt++); } } else { // 這些資料結構用於smc查詢時的記錄 // 沒查到把packet放到packets_裡,從下標0再開始放 // 最後packets_都是未查到的 dp_packet_batch_refill(packets_, packet, i); index_map[n_missed] = map_cnt; flow_map[map_cnt++].flow = NULL; missed_keys[n_missed] = key; key = &keys[++n_missed]; batch_enable = false; // 之後的都是未batched的 } } *n_flows = map_cnt; pmd_perf_update_counter(&pmd->perf_stats, PMD_STAT_EXACT_HIT, n_emc_hit); // 如果沒有開啟smc,直接返回了 if (!smc_enable_db) { return dp_packet_batch_size(packets_); } smc_lookup_batch(pmd, keys, missed_keys, packets_, n_missed, flow_map, index_map); return dp_packet_batch_size(packets_); }
static inline struct dp_netdev_flow * emc_lookup(struct emc_cache *cache, const struct netdev_flow_key *key) { struct emc_entry *current_entry; // 這裡說一下,一個hash分配兩個桶,長度為13位,cache桶的大小為1<<13 // struct emc_cache { // struct emc_entry entries[EM_FLOW_HASH_ENTRIES]; // int sweep_idx; /* For emc_cache_slow_sweep(). */ // }; EMC_FOR_EACH_POS_WITH_HASH (cache, current_entry, key->hash) { if (current_entry->key.hash == key->hash && emc_entry_alive(current_entry) && emc_flow_key_equal_mf(¤t_entry->key, &key->mf)) { /* We found the entry with the 'key->mf' miniflow */ return current_entry->flow; } } return NULL; } #define EM_FLOW_HASH_SHIFT 13 #define EM_FLOW_HASH_ENTRIES (1u << EM_FLOW_HASH_SHIFT) #define EM_FLOW_HASH_MASK (EM_FLOW_HASH_ENTRIES - 1) #define EM_FLOW_HASH_SEGS 2 #define EMC_FOR_EACH_POS_WITH_HASH(EMC, CURRENT_ENTRY, HASH) \ for (uint32_t i__ = 0, srch_hash__ = (HASH); \ (CURRENT_ENTRY) = &(EMC)->entries[srch_hash__ & EM_FLOW_HASH_MASK], \ i__ < EM_FLOW_HASH_SEGS; \ i__++, srch_hash__ >>= EM_FLOW_HASH_SHIFT) // 比較miniflow是否相同 static inline bool emc_flow_key_equal_mf(const struct netdev_flow_key *key, const struct miniflow *mf) { return !memcmp(&key->mf, mf, key->len); }
static inline void smc_lookup_batch(struct dp_netdev_pmd_thread *pmd, struct netdev_flow_key *keys, struct netdev_flow_key **missed_keys, struct dp_packet_batch *packets_, const int cnt, struct dp_packet_flow_map *flow_map, uint8_t *index_map) { int i; struct dp_packet *packet; size_t n_smc_hit = 0, n_missed = 0; struct dfc_cache *cache = &pmd->flow_cache; struct smc_cache *smc_cache = &cache->smc_cache; const struct cmap_node *flow_node; int recv_idx; uint16_t tcp_flags; /* Prefetch buckets for all packets */ for (i = 0; i < cnt; i++) { OVS_PREFETCH(&smc_cache->buckets[keys[i].hash & SMC_MASK]); } DP_PACKET_BATCH_REFILL_FOR_EACH (i, cnt, packet, packets_) { struct dp_netdev_flow *flow = NULL; // 找到hash相同的flow連結串列的頭節點 flow_node = smc_entry_get(pmd, keys[i].hash); bool hit = false; /* Get the original order of this packet in received batch. */ recv_idx = index_map[i]; if (OVS_LIKELY(flow_node != NULL)) { // 遍歷一下看看哪一個是相同的,這個通過offsetof找到存放該cmap結構體的首地址 // dp_netdev_flow裡面的首地址就是, CMAP_NODE_FOR_EACH (flow, node, flow_node) { /* Since we dont have per-port megaflow to check the port * number, we need to verify that the input ports match. */ if (OVS_LIKELY(dpcls_rule_matches_key(&flow->cr, &keys[i]) && flow->flow.in_port.odp_port == packet->md.in_port.odp_port)) { tcp_flags = miniflow_get_tcp_flags(&keys[i].mf); keys[i].len = netdev_flow_key_size(miniflow_n_values(&keys[i].mf)); if (emc_probabilistic_insert(pmd, &keys[i], flow)) { if (flow->status == OFFLOAD_NONE) { queue_netdev_flow_put(pmd->dp->dp_flow_offload, \ pmd->dp->class, \ flow, NULL, DP_NETDEV_FLOW_OFFLOAD_OP_ADD); } } packet_enqueue_to_flow_map(packet, flow, tcp_flags, flow_map, recv_idx); n_smc_hit++; hit = true; break; } } if (hit) { continue; } } // SMC也miss了,和之前一樣,把miss的放packets_裡,從0開始放 dp_packet_batch_refill(packets_, packet, i); index_map[n_missed] = recv_idx; missed_keys[n_missed++] = &keys[i]; } pmd_perf_update_counter(&pmd->perf_stats, PMD_STAT_SMC_HIT, n_smc_hit); }
static inline const struct cmap_node * smc_entry_get(struct dp_netdev_pmd_thread *pmd, const uint32_t hash) { struct smc_cache *cache = &(pmd->flow_cache).smc_cache; // smc_cache桶的大小是(1<<18),SMC_MASK=(1<<18)- 1 // 先通過後hash的後18位元定位到桶 struct smc_bucket *bucket = &cache->buckets[hash & SMC_MASK]; // 一個桶有4個16位元的sig,存key->hash前16位元,正好是64位元 // 遍歷4個元素看那個匹配,獲得匹配後的cmap的下標 uint16_t sig = hash >> 16; uint16_t index = UINT16_MAX; for (int i = 0; i < SMC_ENTRY_PER_BUCKET; i++) { if (bucket->sig[i] == sig) { index = bucket->flow_idx[i]; break; } } // 通過index找到在dpcls裡的桶位置 if (index != UINT16_MAX) { return cmap_find_by_index(&pmd->flow_table, index); } return NULL; }
static inline bool emc_probabilistic_insert(struct dp_netdev_pmd_thread *pmd, const struct netdev_flow_key *key, struct dp_netdev_flow *flow) { /* Insert an entry into the EMC based on probability value 'min'. By * default the value is UINT32_MAX / 100 which yields an insertion * probability of 1/100 ie. 1% */ uint32_t min = pmd->ctx.emc_insert_min; if (min && random_uint32() <= min) { emc_insert(&(pmd->flow_cache).emc_cache, key, flow); return true; } return false; }
static inline void emc_insert(struct emc_cache *cache, const struct netdev_flow_key *key, struct dp_netdev_flow *flow) { struct emc_entry *to_be_replaced = NULL; struct emc_entry *current_entry; EMC_FOR_EACH_POS_WITH_HASH(cache, current_entry, key->hash) { if (netdev_flow_key_equal(¤t_entry->key, key)) { /* We found the entry with the 'mf' miniflow */ emc_change_entry(current_entry, flow, NULL); return; } /* Replacement policy: put the flow in an empty (not alive) entry, or * in the first entry where it can be */ if (!to_be_replaced || (emc_entry_alive(to_be_replaced) && !emc_entry_alive(current_entry)) || current_entry->key.hash < to_be_replaced->key.hash) { // 這個黃色判斷就是我迷惑的地方 to_be_replaced = current_entry; } } /* We didn't find the miniflow in the cache. * The 'to_be_replaced' entry is where the new flow will be stored */ emc_change_entry(to_be_replaced, flow, key); }
if (lc++ > 1024) { lc = 0; coverage_try_clear(); // 這裡的optimize是排序一下TSS dp_netdev_pmd_try_optimize(pmd, poll_list, poll_cnt); dp_netdev_pmd_hook_idle_run(pmd); #ifdef ENABLE_EMC if (!ovsrcu_try_quiesce()) { emc_cache_slow_sweep(pmd->dp, &((pmd->flow_cache).emc_cache)); } #else ovsrcu_try_quiesce(); #endif for (i = 0; i < poll_cnt; i++) { uint64_t current_seq = netdev_get_change_seq(poll_list[i].rxq->port->netdev); if (poll_list[i].change_seq != current_seq) { poll_list[i].change_seq = current_seq; poll_list[i].rxq_enabled = netdev_rxq_enabled(poll_list[i].rxq->rx); } } }
// 執行緒安全的 #define OVSRCU_TYPE(TYPE) struct { ATOMIC(TYPE) p; } struct cmap { OVSRCU_TYPE(struct cmap_impl *) impl; }; /* The implementation of a concurrent hash map. */ struct cmap_impl { // 補齊64位元組 PADDED_MEMBERS_CACHELINE_MARKER(CACHE_LINE_SIZE, cacheline0, unsigned int n; /* Number of in-use elements. */ unsigned int max_n; /* Max elements before enlarging. */ unsigned int min_n; /* Min elements before shrinking. */ uint32_t mask; /* Number of 'buckets', minus one. */ uint32_t basis; /* Basis for rehashing client's hash values. */ ); PADDED_MEMBERS_CACHELINE_MARKER(CACHE_LINE_SIZE, cacheline1, struct cmap_bucket buckets[1]; ); }; struct cmap_bucket { /* Padding to make cmap_bucket exactly one cache line long. */ PADDED_MEMBERS(CACHE_LINE_SIZE, // 鎖機制,讀和寫都會+1,讀的時候等到變成偶數再去讀,保證安全 atomic_uint32_t counter; // 桶中的每個槽用(hashs[i], nodes[i])元組來表示 uint32_t hashes[CMAP_K]; struct cmap_node nodes[CMAP_K]; ); }; struct cmap_node { OVSRCU_TYPE(struct cmap_node *) next; /* Next node with same hash. */ }; /* 二級匹配表.每個報文接收埠對應一個 */ struct dpcls { struct cmap_node node; /* 連結串列節點 */ odp_port_t in_port; /* 報文接收埠 */ struct cmap subtables_map; // 管理下邊subtables的索引,用於遍歷 struct pvector subtables; // 上文TSS演演算法所說的元組表 } struct pvector { // 指向具體子表資訊 OVSRCU_TYPE(struct pvector_impl *) impl; // 平時,temp都是為NULL.只有當pvector擴充時,temp才用來臨時快取資料. // 待排好序後,再拷貝到impl中,temp再置NULL struct pvector_impl *temp; }; // 相當於vector<pvector_entry> struct pvector_impl { size_t size; /* Number of entries in the vector */ size_t allocated; /* Number allocted entries */ /* 初始化的時候只有4個元素.後續可能會擴充 */ struct pvector_entry vector[]; } struct pvector_entry { // pvector_impl中的vector是按照priority從小到大排序的 // pmd_thread_main裡會把priority賦值為hit_cnt,然後排序 int priority; /* 實際指向了struct dpcls_subtable結構 */ void *ptr; } // 子表資訊 struct dpcls_subtable { /* The fields are only used by writers. */ struct cmap_node cmap_node OVS_GUARDED; /* Within dpcls 'subtables_map'. */ struct cmap rules; // 該表的bucket內容 uint32_t hit_cnt; // 命中該子表的次數 // 下邊是mask的miniflow前兩個的bits裡1的個數 uint8_t mf_bits_set_unit0; uint8_t mf_bits_set_unit1; // 根據mf_bits_set_unit01選擇最合適的查詢演演算法 dpcls_subtable_lookup_func lookup_func; /* Caches the masks to match a packet to, reducing runtime calculations. */ uint64_t *mf_masks; // 由下邊的mask->mf->bits[01]得來的, struct netdev_flow_key mask; // 該表的掩碼資訊 }; 關於上邊的mf_masks與mask,舉個例子 mf_bits_set_unit0 = 4, mf_bits_set_unit1 = 0 netdev_flow_key.mf.bits[0] = 111010 (2進位制) mf_masks = [1, 111, 1111, 11111] (2進位制)
static bool dpcls_lookup(struct dpcls *cls, const struct netdev_flow_key *keys[], struct dpcls_rule **rules, const size_t cnt, int *num_lookups_p) { #define MAP_BITS (sizeof(uint32_t) * CHAR_BIT) BUILD_ASSERT_DECL(MAP_BITS >= NETDEV_MAX_BURST); struct dpcls_subtable *subtable; uint32_t keys_map = TYPE_MAXIMUM(uint32_t); /* Set all bits. */ if (cnt != MAP_BITS) { /*keys_map中置1位數為包的總數,並且第i位對應第i個包*/ keys_map >>= MAP_BITS - cnt; /* Clear extra bits. */ } memset(rules, 0, cnt * sizeof *rules); int lookups_match = 0, subtable_pos = 1; uint32_t found_map; PVECTOR_FOR_EACH (subtable, &cls->subtables) { // 查詢函數,對應下邊的lookup_generic() found_map = subtable->lookup_func(subtable, keys_map, keys, rules); uint32_t pkts_matched = count_1bits(found_map); // 搜尋的子表個數,加上的是當前這幾個key找了多少個表 lookups_match += pkts_matched * subtable_pos; keys_map &= ~found_map; if (!keys_map) { if (num_lookups_p) { *num_lookups_p = lookups_match; } // 全找到了 return true; } subtable_pos++; } if (num_lookups_p) { *num_lookups_p = lookups_match; } // 沒有全找到 return false; }
入口在這裡,往下走,傳進去subtable有效欄位有多大 static uint32_t dpcls_subtable_lookup_generic(struct dpcls_subtable *subtable, uint32_t keys_map, const struct netdev_flow_key *keys[], struct dpcls_rule **rules) { return lookup_generic_impl(subtable, keys_map, keys, rules, subtable->mf_bits_set_unit0, subtable->mf_bits_set_unit1); } static inline uint32_t ALWAYS_INLINE lookup_generic_impl(struct dpcls_subtable *subtable, // 當前的subtable uint32_t keys_map, // miss_bit_map const struct netdev_flow_key *keys[], // miss_key struct dpcls_rule **rules, // save hit_rule const uint32_t bit_count_u0, const uint32_t bit_count_u1) { // 有幾個包 const uint32_t n_pkts = count_1bits(keys_map); ovs_assert(NETDEV_MAX_BURST >= n_pkts); uint32_t hashes[NETDEV_MAX_BURST]; // 根據mask欄位的大小開空間 const uint32_t bit_count_total = bit_count_u0 + bit_count_u1; // 一個batch最大是NETDEV_MAX_BURST const uint32_t block_count_required = bit_count_total * NETDEV_MAX_BURST; uint64_t *mf_masks = subtable->mf_masks; int i; // 申請儲存一個batch報文資訊的陣列,存放 uint64_t *blocks_scratch = get_blocks_scratch(block_count_required); // 獲得每個key與當前表的mask「與運算」的結果 ULLONG_FOR_EACH_1 (i, keys_map) { netdev_flow_key_flatten(keys[i], &subtable->mask, // 該表的掩碼資訊 mf_masks, // 由subtable->mask處理後的mask &blocks_scratch[i * bit_count_total], bit_count_u0, bit_count_u1); } // 算出來每一個key在該subtable裡的hash值,該hash值由「mask位元組數,key和mask與運算結果」得出 ULLONG_FOR_EACH_1 (i, keys_map) { uint64_t *block_ptr = &blocks_scratch[i * bit_count_total]; uint32_t hash = hash_add_words64(0, block_ptr, bit_count_total); hashes[i] = hash_finish(hash, bit_count_total * 8); } uint32_t found_map; const struct cmap_node *nodes[NETDEV_MAX_BURST]; // 找到每個key在該subtable裡的cmap,並且返回每個key有沒有被找到,第i位是1則找到 found_map = cmap_find_batch(&subtable->rules, keys_map, hashes, nodes); ULLONG_FOR_EACH_1 (i, found_map) { struct dpcls_rule *rule; // 可能不同的rule有相同的hash,看那個是匹配的 CMAP_NODE_FOR_EACH (rule, cmap_node, nodes[i]) { const uint32_t cidx = i * bit_count_total; /*rule->mask & keys[i]的值與rule->flow相比較*/ uint32_t match = netdev_rule_matches_key(rule, bit_count_total, &blocks_scratch[cidx]); if (OVS_LIKELY(match)) { rules[i] = rule; subtable->hit_cnt++; goto next; } } ULLONG_SET0(found_map, i); /* Did not match. */ next: ; /* Keep Sparse happy. */ } return found_map; }
// 這個函數對應dpif-netdev.c裡面的dpcls_flow_key_gen_masks() static inline void netdev_flow_key_flatten(const struct netdev_flow_key *key, // 要查詢的miss_key const struct netdev_flow_key *mask, const uint64_t *mf_masks, uint64_t *blocks_scratch, const uint32_t u0_count, const uint32_t u1_count) { /* Load mask from subtable, mask with packet mf, popcount to get idx. */ const uint64_t *pkt_blocks = miniflow_get_values(&key->mf); const uint64_t *tbl_blocks = miniflow_get_values(&mask->mf); // 獲取miss_key和mask的miniflow /* Packet miniflow bits to be masked by pre-calculated mf_masks. */ const uint64_t pkt_bits_u0 = key->mf.map.bits[0]; const uint32_t pkt_bits_u0_pop = count_1bits(pkt_bits_u0); const uint64_t pkt_bits_u1 = key->mf.map.bits[1]; // 這個函數就是把miss_key與subtable的掩碼進行&運算 // 會運算出該mask在意欄位結果,放到blocks_scratch裡 netdev_flow_key_flatten_unit(&pkt_blocks[0], // key-mf的資料段 &tbl_blocks[0], // mask->mf的資料段 &mf_masks[0], // mask->mf->bits得來mask &blocks_scratch[0], // 存放的地址 pkt_bits_u0, // key->mf裡的bits[0] u0_count); // mask->mf->bits[0]裡1的個數 netdev_flow_key_flatten_unit(&pkt_blocks[pkt_bits_u0_pop], // 上邊bits[0]的已經算過了,從bits[1]開始算 &tbl_blocks[u0_count], &mf_masks[u0_count], &blocks_scratch[u0_count], pkt_bits_u1, u1_count); } static inline void netdev_flow_key_flatten_unit(const uint64_t *pkt_blocks, // key-mf的資料段 const uint64_t *tbl_blocks, // mask->mf裡的資料段 const uint64_t *mf_masks, // mask->mf->bits得來mask uint64_t *blocks_scratch, // 存放到這裡 const uint64_t pkt_mf_bits, // key->mf裡的bits[01] const uint32_t count) // mask->mf->bits[0]裡1的個數 { // 說一下意思,這個我們流程就是用key和subtable的mask與運算,肯定只需要與運算mask裡 // 不為0的欄位,其他的mask不關心,然後這個操作就是為了得到key對應欄位是key->mf的第幾位, // 比如mask的bits[0]=11111, key的bits[0] = 10100, mask裡的第3個1在key裡面是第1個 // 這一位與的結果就是tbl_blocks[2]&pkt_blocks[0], 也就是怎麼找到key裡的下標0 // 就看key當前位之前有幾個1就行了。這裡這樣做的1010111, // 藍色1之前有count_1bits(1010111 & 0001111) = 3 // 對上邊的mask舉個例子 count = 4; // mask->mf->bits[0] = 111010 (2進位制) // mf_masks = [1, 111, 1111, 11111] (2進位制); // pkt_mf_bits = 010100 // blocks_scratch = [0,0,0,0,pkt_blocks[1]&tbl_blocks[4],0] uint32_t i; for (i = 0; i < count; i++) { // 拿i=2舉例 uint64_t mf_mask = mf_masks[i]; // mf_mask = 001111 uint64_t idx_bits = mf_mask & pkt_mf_bits; // idx_bits = 000100 const uint32_t pkt_idx = count_1bits(idx_bits); // pkt_idx = 1 uint64_t pkt_has_mf_bit = (mf_mask + 1) & pkt_mf_bits; // pkt_has_mf_bit = 010000 // 是否求掩碼:mask當前位對應的key的欄位,如果key在當前位是0,下邊算掩碼就會變成0 uint64_t no_bit = ((!pkt_has_mf_bit) > 0) - 1; // 2^64 - 1 // mask裡第i個欄位與運算key對應的欄位 blocks_scratch[i] = pkt_blocks[pkt_idx] & tbl_blocks[i] & no_bit; // } }
unsigned long cmap_find_batch(const struct cmap *cmap, unsigned long map, uint32_t hashes[], const struct cmap_node *nodes[]) { const struct cmap_impl *impl = cmap_get_impl(cmap); unsigned long result = map; int i; // 每一位就是一個包,一位元組8個包 uint32_t h1s[sizeof map * CHAR_BIT]; const struct cmap_bucket *b1s[sizeof map * CHAR_BIT]; const struct cmap_bucket *b2s[sizeof map * CHAR_BIT]; uint32_t c1s[sizeof map * CHAR_BIT]; // 每個impl裡桶的數量為impl->mask+1 // 為什麼mask是桶的個數減1:因為下標從0開始,找下表的時候直接(hash & impl->mask)就行了 // 至於為什麼開兩個?因為buckets存放的方法也是一個值對應兩個hash // 第一次hash1 = rehash(impl->basis, hash), 找buckets[hash1 & impl->mask], 遍歷裡面CMAP_K個元素 // 第二次hash2 = other_hash(hash1), 找buckets[hash2 & impl->mask], 遍歷裡面CMAP_K個元素 /* Compute hashes and prefetch 1st buckets. */ ULLONG_FOR_EACH_1(i, map) { h1s[i] = rehash(impl, hashes[i]); b1s[i] = &impl->buckets[h1s[i] & impl->mask]; OVS_PREFETCH(b1s[i]); } /* Lookups, Round 1. Only look up at the first bucket. */ ULLONG_FOR_EACH_1(i, map) { uint32_t c1; const struct cmap_bucket *b1 = b1s[i]; const struct cmap_node *node; do { c1 = read_even_counter(b1); // 找一下這個cmap_bucket裡面有沒有相同hash的 node = cmap_find_in_bucket(b1, hashes[i]); } while (OVS_UNLIKELY(counter_changed(b1, c1))); if (!node) { /* Not found (yet); Prefetch the 2nd bucket. */ b2s[i] = &impl->buckets[other_hash(h1s[i]) & impl->mask]; OVS_PREFETCH(b2s[i]); c1s[i] = c1; /* We may need to check this after Round 2. */ continue; } /* Found. */ ULLONG_SET0(map, i); /* Ignore this on round 2. */ OVS_PREFETCH(node); nodes[i] = node; } /* Round 2. Look into the 2nd bucket, if needed. */ ULLONG_FOR_EACH_1(i, map) { uint32_t c2; const struct cmap_bucket *b2 = b2s[i]; const struct cmap_node *node; do { c2 = read_even_counter(b2); node = cmap_find_in_bucket(b2, hashes[i]); } while (OVS_UNLIKELY(counter_changed(b2, c2))); if (!node) { // 可能被修改了, if (OVS_UNLIKELY(counter_changed(b1s[i], c1s[i]))) { node = cmap_find__(b1s[i], b2s[i], hashes[i]); if (node) { goto found; } } /* Not found. */ ULLONG_SET0(result, i); /* Fix the result. */ continue; } found: OVS_PREFETCH(node); nodes[i] = node; } return result; }
static inline void fast_path_processing(struct dp_netdev_pmd_thread *pmd, struct dp_packet_batch *packets_, struct netdev_flow_key **keys, struct dp_packet_flow_map *flow_map, uint8_t *index_map, odp_port_t in_port) { const size_t cnt = dp_packet_batch_size(packets_); #if !defined(__CHECKER__) && !defined(_WIN32) const size_t PKT_ARRAY_SIZE = cnt; #else /* Sparse or MSVC doesn't like variable length array. */ enum { PKT_ARRAY_SIZE = NETDEV_MAX_BURST }; #endif struct dp_packet *packet; struct dpcls *cls; struct dpcls_rule *rules[PKT_ARRAY_SIZE]; struct dp_netdev *dp = pmd->dp; int upcall_ok_cnt = 0, upcall_fail_cnt = 0; int lookup_cnt = 0, add_lookup_cnt; bool any_miss; for (size_t i = 0; i < cnt; i++) { /* Key length is needed in all the cases, hash computed on demand. */ keys[i]->len = netdev_flow_key_size(miniflow_n_values(&keys[i]->mf)); } /* Get the classifier for the in_port */ // 找到埠對應的dpcls結構,每個port有自己的dpcls,因為每個port收到的報文會更相似 cls = dp_netdev_pmd_lookup_dpcls(pmd, in_port); if (OVS_LIKELY(cls)) { // 呼叫dpcls_lookup進行匹配 any_miss = !dpcls_lookup(cls, (const struct netdev_flow_key **)keys, rules, cnt, &lookup_cnt); } else { any_miss = true; memset(rules, 0, sizeof(rules)); } // 如果有miss的,則需要進行openflow流表查詢 if (OVS_UNLIKELY(any_miss) && !fat_rwlock_tryrdlock(&dp->upcall_rwlock)) { uint64_t actions_stub[512 / 8], slow_stub[512 / 8]; struct ofpbuf actions, put_actions; ofpbuf_use_stub(&actions, actions_stub, sizeof actions_stub); ofpbuf_use_stub(&put_actions, slow_stub, sizeof slow_stub); DP_PACKET_BATCH_FOR_EACH (i, packet, packets_) { struct dp_netdev_flow *netdev_flow; if (OVS_LIKELY(rules[i])) { continue; } // 此時可能已經更新了,在進入upcall之前如果再查一次,如果能夠查到,會比upcall消耗的少得多 netdev_flow = dp_netdev_pmd_lookup_flow(pmd, keys[i], &add_lookup_cnt); if (netdev_flow) { lookup_cnt += add_lookup_cnt; rules[i] = &netdev_flow->cr; continue; } // 第一級和第二級流表查詢失敗後,就要查詢第三級流表了,即openflow流表,這也稱為upcall呼叫。 // 在普通ovs下是通過netlink實現的,在ovs+dpdk下,直接在pmd執行緒中呼叫upcall_cb即可。 // 開始查詢openflow流表。如果查詢openflow流表成功並需要下發到dpcls時,需要判斷是否超出最大流表限制 int error = handle_packet_upcall(pmd, packet, keys[i], &actions, &put_actions); if (OVS_UNLIKELY(error)) { upcall_fail_cnt++; } else { upcall_ok_cnt++; } } ofpbuf_uninit(&actions); ofpbuf_uninit(&put_actions); fat_rwlock_unlock(&dp->upcall_rwlock); } else if (OVS_UNLIKELY(any_miss)) { DP_PACKET_BATCH_FOR_EACH (i, packet, packets_) { if (OVS_UNLIKELY(!rules[i])) { dp_packet_delete(packet); COVERAGE_INC(datapath_drop_lock_error); upcall_fail_cnt++; } } } DP_PACKET_BATCH_FOR_EACH (i, packet, packets_) { struct dp_netdev_flow *flow; /* Get the original order of this packet in received batch. */ int recv_idx = index_map[i]; uint16_t tcp_flags; if (OVS_UNLIKELY(!rules[i])) { continue; } flow = dp_netdev_flow_cast(rules[i]); bool hook_cached = false; if (pmd->cached_hook && \ pmd->cached_hook_pmd && \ pmd->cached_hook->hook_flow_miss) { hook_cached = pmd->cached_hook->hook_flow_miss(pmd->cached_hook_pmd, packet, flow); } if (!hook_cached) { bool smc_enable_db; atomic_read_relaxed(&pmd->dp->smc_enable_db, &smc_enable_db); // 查詢到了packet,如果開啟了smc,更新smc if (smc_enable_db) { uint32_t hash = dp_netdev_flow_hash(&flow->ufid); smc_insert(pmd, keys[i], hash); } // 查到了packet,看是否寫會更新上一層cache(EMC) if (emc_probabilistic_insert(pmd, keys[i], flow)) { if (flow->status == OFFLOAD_NONE) { queue_netdev_flow_put(pmd->dp->dp_flow_offload, \ pmd->dp->class, \ flow, NULL, DP_NETDEV_FLOW_OFFLOAD_OP_ADD); } } } /* Add these packets into the flow map in the same order * as received. */ tcp_flags = miniflow_get_tcp_flags(&keys[i]->mf); packet_enqueue_to_flow_map(packet, flow, tcp_flags, flow_map, recv_idx); } // 更新各個資訊 pmd_perf_update_counter(&pmd->perf_stats, PMD_STAT_MASKED_HIT, cnt - upcall_ok_cnt - upcall_fail_cnt); pmd_perf_update_counter(&pmd->perf_stats, PMD_STAT_MASKED_LOOKUP, lookup_cnt); pmd_perf_update_counter(&pmd->perf_stats, PMD_STAT_MISS, upcall_ok_cnt); pmd_perf_update_counter(&pmd->perf_stats, PMD_STAT_LOST, upcall_fail_cnt); }
static inline void smc_insert(struct dp_netdev_pmd_thread *pmd, const struct netdev_flow_key *key, uint32_t hash) { struct smc_cache *smc_cache = &(pmd->flow_cache).smc_cache; struct smc_bucket *bucket = &smc_cache->buckets[key->hash & SMC_MASK]; uint16_t index; uint32_t cmap_index; int i; //布穀鳥演演算法 cmap_index = cmap_find_index(&pmd->flow_table, hash); index = (cmap_index >= UINT16_MAX) ? UINT16_MAX : (uint16_t)cmap_index; /* If the index is larger than SMC can handle (uint16_t), we don't * insert */ if (index == UINT16_MAX) { //表明找到了 return; } /* If an entry with same signature already exists, update the index */ uint16_t sig = key->hash >> 16; for (i = 0; i < SMC_ENTRY_PER_BUCKET; i++) { if (bucket->sig[i] == sig) { bucket->flow_idx[i] = index; return; } } /* If there is an empty entry, occupy it. */ for (i = 0; i < SMC_ENTRY_PER_BUCKET; i++) { if (bucket->flow_idx[i] == UINT16_MAX) { bucket->sig[i] = sig; bucket->flow_idx[i] = index; return; } } /* Otherwise, pick a random entry. */ i = random_uint32() % SMC_ENTRY_PER_BUCKET; bucket->sig[i] = sig; bucket->flow_idx[i] = index; }
static bool cmap_try_insert(struct cmap_impl *impl, struct cmap_node *node, uint32_t hash) { uint32_t h1 = rehash(impl, hash); uint32_t h2 = other_hash(h1); // hash兩次找到兩個桶 struct cmap_bucket *b1 = &impl->buckets[h1 & impl->mask]; struct cmap_bucket *b2 = &impl->buckets[h2 & impl->mask]; // 插入規則: // 1.是否有相同hash的node,就插到對應鏈上 // 2.沒有相同hash,就看有沒有空的node // 3.都不行就通過bfs,看能否讓b1,b2空出來一個,把這個放進去 // 都不行就插入失敗 return (OVS_UNLIKELY(cmap_insert_dup(node, hash, b1) || cmap_insert_dup(node, hash, b2)) || OVS_LIKELY(cmap_insert_bucket(node, hash, b1) || cmap_insert_bucket(node, hash, b2)) || cmap_insert_bfs(impl, node, hash, b1, b2)); }