進軍Python全棧開發--14.Python中的數據結構與演算法

2020-08-13 20:28:12

Python語言進階

重要知識點

  • 生成式(推導式)的用法

    prices = {
        'AAPL': 191.88,
        'GOOG': 1186.96,
        'IBM': 149.24,
        'ORCL': 48.44,
        'ACN': 166.89,
        'FB': 208.09,
        'SYMC': 21.29
    }
    # 用股票價格大於100元的股票構造一個新的字典
    prices2 = {key: value for key, value in prices.items() if value > 100}
    print(prices2)
    

    說明:生成式(推導式)可以用來生成列表、集合和字典。

  • 巢狀的列表的坑

    names = ['關羽', '張飛', '趙雲', '馬超', '黃忠']
    courses = ['語文', '數學', '英語']
    # 錄入五個學生三門課程的成績
    # 錯誤 - 參考http://pythontutor.com/visualize.html#mode=edit
    # scores = [[None] * len(courses)] * len(names)
    scores = [[None] * len(courses) for _ in range(len(names))]
    for row, name in enumerate(names):
        for col, course in enumerate(courses):
            scores[row][col] = float(input(f'請輸入{name}的{course}成績: '))
            print(scores)
    

    Python Tutor - VISUALIZE CODE AND GET LIVE HELP

  • heapq模組(堆排序)

    """
    從列表中找出最大的或最小的N個元素
    堆結構(大根堆/小根堆)
    """
    import heapq
    
    list1 = [34, 25, 12, 99, 87, 63, 58, 78, 88, 92]
    list2 = [
        {'name': 'IBM', 'shares': 100, 'price': 91.1},
        {'name': 'AAPL', 'shares': 50, 'price': 543.22},
        {'name': 'FB', 'shares': 200, 'price': 21.09},
        {'name': 'HPQ', 'shares': 35, 'price': 31.75},
        {'name': 'YHOO', 'shares': 45, 'price': 16.35},
        {'name': 'ACME', 'shares': 75, 'price': 115.65}
    ]
    print(heapq.nlargest(3, list1))
    print(heapq.nsmallest(3, list1))
    print(heapq.nlargest(2, list2, key=lambda x: x['price']))
    print(heapq.nlargest(2, list2, key=lambda x: x['shares']))
    
  • itertools模組

    """
    迭代工具模組
    """
    import itertools
    
    # 產生ABCD的全排列
    itertools.permutations('ABCD')
    # 產生ABCDE的五選三組合
    itertools.combinations('ABCDE', 3)
    # 產生ABCD和123的笛卡爾積
    itertools.product('ABCD', '123')
    # 產生ABC的無限回圈序列
    itertools.cycle(('A', 'B', 'C'))
    
  • collections模組

    常用的工具類:

    • namedtuple:命令元組,它是一個類工廠,接受型別的名稱和屬性列表來建立一個類。
    • deque:雙端佇列,是列表的替代實現。Python中的列表底層是基於陣列來實現的,而deque底層是雙向鏈表,因此當你需要在頭尾新增和刪除元素是,deque會表現出更好的效能,漸近時間複雜度爲O(1)O(1)
    • Counterdict的子類,鍵是元素,值是元素的計數,它的most_common()方法可以幫助我們獲取出現頻率最高的元素。Counterdict的繼承關係我認爲是值得商榷的,按照CARP原則,Counterdict的關係應該設計爲關聯關係更爲合理。
    • OrderedDictdict的子類,它記錄了鍵值對插入的順序,看起來既有字典的行爲,也有鏈表的行爲。
    • defaultdict:類似於字典型別,但是可以通過預設的工廠函數來獲得鍵對應的預設值,相比字典中的setdefault()方法,這種做法更加高效。
    """
    找出序列中出現次數最多的元素
    """
    from collections import Counter
    
    words = [
        'look', 'into', 'my', 'eyes', 'look', 'into', 'my', 'eyes',
        'the', 'eyes', 'the', 'eyes', 'the', 'eyes', 'not', 'around',
        'the', 'eyes', "don't", 'look', 'around', 'the', 'eyes',
        'look', 'into', 'my', 'eyes', "you're", 'under'
    ]
    counter = Counter(words)
    print(counter.most_common(3))
    

數據結構和演算法

  • 演算法:解決問題的方法和步驟

  • 評價演算法的好壞:漸近時間複雜度和漸近空間複雜度。

  • 漸近時間複雜度的大O標記:

    • - 常數時間複雜度 - 布隆過濾器 / 雜湊儲存
    • - 對數時間複雜度 - 折半查詢(二分查詢)
    • - 線性時間複雜度 - 順序查詢 / 計數排序
    • - 對數線性時間複雜度 - 高階排序演算法(歸併排序、快速排序)
    • - 平方時間複雜度 - 簡單排序演算法(選擇排序、插入排序、氣泡排序)
    • - 立方時間複雜度 - Floyd演算法 / 矩陣乘法運算
    • - 幾何級數時間複雜度 - 漢諾塔
    • - 階乘時間複雜度 - 旅行經銷商問題 - NPC

    [外連圖片轉存失敗,源站可能有防盜鏈機制 機製,建議將圖片儲存下來直接上傳(img-m1RkDEpC-1597321604207)(./res/algorithm_complexity_1.png)]

    [外連圖片轉存失敗,源站可能有防盜鏈機制 機製,建議將圖片儲存下來直接上傳(img-mAsZk2Ol-1597321604209)(./res/algorithm_complexity_2.png)]

  • 排序演算法(選擇、冒泡和歸併)和查詢演算法(順序和折半)

    def select_sort(items, comp=lambda x, y: x < y):
        """簡單選擇排序"""
        items = items[:]
        for i in range(len(items) - 1):
            min_index = i
            for j in range(i + 1, len(items)):
                if comp(items[j], items[min_index]):
                    min_index = j
            items[i], items[min_index] = items[min_index], items[i]
        return items
    
    def bubble_sort(items, comp=lambda x, y: x > y):
        """氣泡排序"""
        items = items[:]
        for i in range(len(items) - 1):
            swapped = False
            for j in range(i, len(items) - 1 - i):
                if comp(items[j], items[j + 1]):
                    items[j], items[j + 1] = items[j + 1], items[j]
                    swapped = True
            if not swapped:
                break
        return items
    
    def bubble_sort(items, comp=lambda x, y: x > y):
        """攪拌排序(氣泡排序升級版)"""
        items = items[:]
        for i in range(len(items) - 1):
            swapped = False
            for j in range(i, len(items) - 1 - i):
                if comp(items[j], items[j + 1]):
                    items[j], items[j + 1] = items[j + 1], items[j]
                    swapped = True
            if swapped:
                swapped = False
                for j in range(len(items) - 2 - i, i, -1):
                    if comp(items[j - 1], items[j]):
                        items[j], items[j - 1] = items[j - 1], items[j]
                        swapped = True
            if not swapped:
                break
        return items
    
    def merge(items1, items2, comp=lambda x, y: x < y):
        """合併(將兩個有序的列表合併成一個有序的列表)"""
        items = []
        index1, index2 = 0, 0
        while index1 < len(items1) and index2 < len(items2):
            if comp(items1[index1], items2[index2]):
                items.append(items1[index1])
                index1 += 1
            else:
                items.append(items2[index2])
                index2 += 1
        items += items1[index1:]
        items += items2[index2:]
        return items
    
    
    def merge_sort(items, comp=lambda x, y: x < y):
        return _merge_sort(list(items), comp)
    
    
    def _merge_sort(items, comp):
        """歸併排序"""
        if len(items) < 2:
            return items
        mid = len(items) // 2
        left = _merge_sort(items[:mid], comp)
        right = _merge_sort(items[mid:], comp)
        return merge(left, right, comp)
    
    def seq_search(items, key):
        """順序查詢"""
        for index, item in enumerate(items):
            if item == key:
                return index
        return -1
    
    def bin_search(items, key):
        """折半查詢"""
        start, end = 0, len(items) - 1
        while start <= end:
            mid = (start + end) // 2
            if key > items[mid]:
                start = mid + 1
            elif key < items[mid]:
                end = mid - 1
            else:
                return mid
        return -1
    
  • 常用演算法:

    • 窮舉法 - 又稱爲暴力破解法,對所有的可能性進行驗證,直到找到正確答案。
    • 貪婪法 - 在對問題求解時,總是做出在當前看來
    • 最好的選擇,不追求最優解,快速找到滿意解。
    • 分治法 - 把一個複雜的問題分成兩個或更多的相同或相似的子問題,再把子問題分成更小的子問題,直到可以直接求解的程度,最後將子問題的解進行合併得到原問題的解。
    • 回溯法 - 回溯法又稱爲試探法,按選優條件向前搜尋,當搜尋到某一步發現原先選擇並不優或達不到目標時,就退回一步重新選擇。
    • 動態規劃 - 基本思想也是將待求解問題分解成若幹個子問題,先求解並儲存這些子問題的解,避免產生大量的重複運算。

    窮舉法例子:百錢百雞和五人分魚。

    # 公雞5元一隻 母雞3元一隻 小雞1元三隻
    # 用100元買100只雞 問公雞/母雞/小雞各多少隻
    for x in range(20):
        for y in range(33):
            z = 100 - x - y
            if 5 * x + 3 * y + z // 3 == 100 and z % 3 == 0:
                print(x, y, z)
    
    # A、B、C、D、E五人在某天夜裏合夥捕魚 最後疲憊不堪各自睡覺
    # 第二天A第一個醒來 他將魚分爲5份 扔掉多餘的1條 拿走自己的一份
    # B第二個醒來 也將魚分爲5份 扔掉多餘的1條 拿走自己的一份
    # 然後C、D、E依次醒來也按同樣的方式分魚 問他們至少捕了多少條魚
    fish = 6
    while True:
        total = fish
        enough = True
        for _ in range(5):
            if (total - 1) % 5 == 0:
                total = (total - 1) // 5 * 4
            else:
                enough = False
                break
        if enough:
            print(fish)
            break
        fish += 5
    

    貪婪法例子:假設小偷有一個揹包,最多能裝20公斤贓物,他闖入一戶人家,發現如下表所示的物品。很顯然,他不能把所有物品都裝進揹包,所以必須確定拿走哪些物品,留下哪些物品。

    名稱 價格(美元) 重量(kg)
    電腦 200 20
    收音機 20 4
    175 10
    花瓶 50 2
    10 1
    油畫 90 9
    """
    貪婪法:在對問題求解時,總是做出在當前看來是最好的選擇,不追求最優解,快速找到滿意解。
    輸入:
    20 6
    電腦 200 20
    收音機 20 4
    鐘 175 10
    花瓶 50 2
    書 10 1
    油畫 90 9
    """
    class Thing(object):
        """物品"""
    
        def __init__(self, name, price, weight):
            self.name = name
            self.price = price
            self.weight = weight
    
        @property
        def value(self):
            """價格重量比"""
            return self.price / self.weight
    
    
    def input_thing():
        """輸入物品資訊"""
        name_str, price_str, weight_str = input().split()
        return name_str, int(price_str), int(weight_str)
    
    
    def main():
        """主函數"""
        max_weight, num_of_things = map(int, input().split())
        all_things = []
        for _ in range(num_of_things):
            all_things.append(Thing(*input_thing()))
        all_things.sort(key=lambda x: x.value, reverse=True)
        total_weight = 0
        total_price = 0
        for thing in all_things:
            if total_weight + thing.weight <= max_weight:
                print(f'小偷拿走了{thing.name}')
                total_weight += thing.weight
                total_price += thing.price
        print(f'總價值: {total_price}美元')
    
    
    if __name__ == '__main__':
        main()
    

    分治法例子:快速排序

    """
    快速排序 - 選擇樞軸對元素進行劃分,左邊都比樞軸小右邊都比樞軸大
    """
    def quick_sort(items, comp=lambda x, y: x <= y):
        items = list(items)[:]
        _quick_sort(items, 0, len(items) - 1, comp)
        return items
    
    
    def _quick_sort(items, start, end, comp):
        if start < end:
            pos = _partition(items, start, end, comp)
            _quick_sort(items, start, pos - 1, comp)
            _quick_sort(items, pos + 1, end, comp)
    
    
    def _partition(items, start, end, comp):
        pivot = items[end]
        i = start - 1
        for j in range(start, end):
            if comp(items[j], pivot):
                i += 1
                items[i], items[j] = items[j], items[i]
        items[i + 1], items[end] = items[end], items[i + 1]
        return i + 1
    

    回溯法例子:騎士巡邏

    """
    遞歸回溯法:叫稱爲試探法,按選優條件向前搜尋,當搜尋到某一步,發現原先選擇並不優或達不到目標時,就退回一步重新選擇,比較經典的問題包括騎士巡邏、八皇後和迷宮尋路等。
    """
    import sys
    import time
    
    SIZE = 5
    total = 0
    
    
    def print_board(board):
        for row in board:
            for col in row:
                print(str(col).center(4), end='')
            print()
    
    
    def patrol(board, row, col, step=1):
        if row >= 0 and row < SIZE and \
            col >= 0 and col < SIZE and \
            board[row][col] == 0:
            board[row][col] = step
            if step == SIZE * SIZE:
                global total
                total += 1
                print(f'第{total}種走法: ')
                print_board(board)
            patrol(board, row - 2, col - 1, step + 1)
            patrol(board, row - 1, col - 2, step + 1)
            patrol(board, row + 1, col - 2, step + 1)
            patrol(board, row + 2, col - 1, step + 1)
            patrol(board, row + 2, col + 1, step + 1)
            patrol(board, row + 1, col + 2, step + 1)
            patrol(board, row - 1, col + 2, step + 1)
            patrol(board, row - 2, col + 1, step + 1)
            board[row][col] = 0
    
    
    def main():
        board = [[0] * SIZE for _ in range(SIZE)]
        patrol(board, SIZE - 1, SIZE - 1)
    
    
    if __name__ == '__main__':
        main()
    

    動態規劃例子:子列表元素之和的最大值。

    說明:子列表指的是列表中索引(下標)連續的元素構成的列表;列表中的元素是int型別,可能包含正整數、0、負整數;程式輸入列表中的元素,輸出子列表元素求和的最大值,例如:

    輸入:1 -2 3 5 -3 2

    輸出:8

    輸入:0 -2 3 5 -1 2

    輸出:9

    輸入:-9 -2 -3 -5 -3

    輸出:-2

    def main():
        items = list(map(int, input().split()))
        overall = partial = items[0]
        for i in range(1, len(items)):
            partial = max(items[i], partial + items[i])
            overall = max(partial, overall)
        print(overall)
    
    
    if __name__ == '__main__':
        main()
    

    說明:這個題目最容易想到的解法是使用二重回圈,但是程式碼的時間效能將會變得非常的糟糕。使用動態規劃的思想,僅僅是多用了兩個變數,就將原來O(N2)O(N^2)複雜度的問題變成了O(N)O(N)

函數的使用方式

  • 將函數視爲「一等公民」

    • 函數可以賦值給變數
    • 函數可以作爲函數的參數
    • 函數可以作爲函數的返回值
  • 高階函數的用法(filtermap以及它們的替代品)

    items1 = list(map(lambda x: x ** 2, filter(lambda x: x % 2, range(1, 10))))
    items2 = [x ** 2 for x in range(1, 10) if x % 2]
    
  • 位置參數、可變參數、關鍵字參數、命名關鍵字參數

  • 參數的元資訊(程式碼可讀性問題)

  • 匿名函數和行內函式的用法(lambda函數)

  • 閉包和作用域問題

    • Python搜尋變數的LEGB順序(Local >>> Embedded >>> Global >>> Built-in)

    • globalnonlocal關鍵字的作用

      global:宣告或定義全域性變數(要麼直接使用現有的全域性作用域的變數,要麼定義一個變數放到全域性作用域)。

      nonlocal:宣告使用巢狀作用域的變數(巢狀作用域必須存在該變數,否則報錯)。

  • 裝飾器函數(使用裝飾器和取消裝飾器)

    例子:輸出函數執行時間的裝飾器。

    def record_time(func):
        """自定義裝飾函數的裝飾器"""
        
        @wraps(func)
        def wrapper(*args, **kwargs):
            start = time()
            result = func(*args, **kwargs)
            print(f'{func.__name__}: {time() - start}秒')
            return result
            
        return wrapper
    

    如果裝飾器不希望跟print函數耦合,可以編寫可以參數化的裝飾器。

    from functools import wraps
    from time import time
    
    
    def record(output):
        """可以參數化的裝飾器"""
    	
    	def decorate(func):
    		
    		@wraps(func)
    		def wrapper(*args, **kwargs):
    			start = time()
    			result = func(*args, **kwargs)
    			output(func.__name__, time() - start)
    			return result
                
    		return wrapper
    	
    	return decorate
    
    from functools import wraps
    from time import time
    
    
    class Record():
        """通過定義類的方式定義裝飾器"""
    
        def __init__(self, output):
            self.output = output
    
        def __call__(self, func):
    
            @wraps(func)
            def wrapper(*args, **kwargs):
                start = time()
                result = func(*args, **kwargs)
                self.output(func.__name__, time() - start)
                return result
    
            return wrapper
    

    說明:由於對帶裝飾功能的函數新增了@wraps裝飾器,可以通過func.__wrapped__方式獲得被裝飾之前的函數或類來取消裝飾器的作用。

    例子:用裝飾器來實現單例模式。

    from functools import wraps
    
    
    def singleton(cls):
        """裝飾類的裝飾器"""
        instances = {}
    
        @wraps(cls)
        def wrapper(*args, **kwargs):
            if cls not in instances:
                instances[cls] = cls(*args, **kwargs)
            return instances[cls]
    
        return wrapper
    
    
    @singleton
    class President:
        """總統(單例類)"""
        pass
    

    提示:上面的程式碼中用到了閉包(closure),不知道你是否已經意識到了。還沒有一個小問題就是,上面的程式碼並沒有實現執行緒安全的單例,如果要實現執行緒安全的單例應該怎麼做呢?

    執行緒安全的單例裝飾器。

    from functools import wraps
    from threading import RLock
    
    
    def singleton(cls):
        """執行緒安全的單例裝飾器"""
        instances = {}
        locker = RLock()
    
        @wraps(cls)
        def wrapper(*args, **kwargs):
            if cls not in instances:
                with locker:
                    if cls not in instances:
                        instances[cls] = cls(*args, **kwargs)
            return instances[cls]
    
        return wrapper
    

    提示:上面的程式碼用到了with上下文語法來進行鎖操作,因爲鎖物件本身就是上下文管理器物件(支援__enter____exit__魔術方法)。在wrapper函數中,我們先做了一次不帶鎖的檢查,然後再做帶鎖的檢查,這樣做比直接加鎖檢查效能要更好,如果物件已經建立就沒有必須再去加鎖而是直接返回該物件就可以了。

物件導向相關知識

  • 三大支柱:封裝、繼承、多型

    例子:工資結算系統。

    """
    月薪結算系統 - 部門經理每月15000 程式設計師每小時200 銷售員1800底薪加銷售額5%提成
    """
    from abc import ABCMeta, abstractmethod
    
    
    class Employee(metaclass=ABCMeta):
        """員工(抽象類)"""
    
        def __init__(self, name):
            self.name = name
    
        @abstractmethod
        def get_salary(self):
            """結算月薪(抽象方法)"""
            pass
    
    
    class Manager(Employee):
        """部門經理"""
    
        def get_salary(self):
            return 15000.0
    
    
    class Programmer(Employee):
        """程式設計師"""
    
        def __init__(self, name, working_hour=0):
            self.working_hour = working_hour
            super().__init__(name)
    
        def get_salary(self):
            return 200.0 * self.working_hour
    
    
    class Salesman(Employee):
        """銷售員"""
    
        def __init__(self, name, sales=0.0):
            self.sales = sales
            super().__init__(name)
    
        def get_salary(self):
            return 1800.0 + self.sales * 0.05
    
    
    class EmployeeFactory:
        """建立員工的工廠(工廠模式 - 通過工廠實現物件使用者和物件之間的解耦合)"""
    
        @staticmethod
        def create(emp_type, *args, **kwargs):
            """建立員工"""
            all_emp_types = {'M': Manager, 'P': Programmer, 'S': Salesman}
            cls = all_emp_types[emp_type.upper()]
            return cls(*args, **kwargs) if cls else None
    
    
    def main():
        """主函數"""
        emps = [
            EmployeeFactory.create('M', '曹操'), 
            EmployeeFactory.create('P', '荀彧', 120),
            EmployeeFactory.create('P', '郭嘉', 85), 
            EmployeeFactory.create('S', '典韋', 123000),
        ]
        for emp in emps:
            print(f'{emp.name}: {emp.get_salary():.2f}元')
    
    
    if __name__ == '__main__':
        main()
    
  • 類與類之間的關係

    • is-a關係:繼承
    • has-a關係:關聯 / 聚合 / 合成
    • use-a關係:依賴

    例子:撲克遊戲。

    """
    經驗:符號常數總是優於字面常數,列舉型別是定義符號常數的最佳選擇
    """
    from enum import Enum, unique
    
    import random
    
    
    @unique
    class Suite(Enum):
        """花色"""
    
        SPADE, HEART, CLUB, DIAMOND = range(4)
    
        def __lt__(self, other):
            return self.value < other.value
    
    
    class Card():
        """牌"""
    
        def __init__(self, suite, face):
            """初始化方法"""
            self.suite = suite
            self.face = face
    
        def show(self):
            """顯示牌面"""
            suites = ['♠︎', '♥︎', '♣︎', '♦︎']
            faces = ['', 'A', '2', '3', '4', '5', '6', '7', '8', '9', '10', 'J', 'Q', 'K']
            return f'{suites[self.suite.value]}{faces[self.face]}'
    
        def __repr__(self):
            return self.show()
    
    
    class Poker():
        """撲克"""
    
        def __init__(self):
            self.index = 0
            self.cards = [Card(suite, face)
                          for suite in Suite
                          for face in range(1, 14)]
    
        def shuffle(self):
            """洗牌(隨機亂序)"""
            random.shuffle(self.cards)
            self.index = 0
    
        def deal(self):
            """發牌"""
            card = self.cards[self.index]
            self.index += 1
            return card
    
        @property
        def has_more(self):
            return self.index < len(self.cards)
    
    
    class Player():
        """玩家"""
    
        def __init__(self, name):
            self.name = name
            self.cards = []
    
        def get_one(self, card):
            """摸一張牌"""
            self.cards.append(card)
    
        def sort(self, comp=lambda card: (card.suite, card.face)):
            """整理手上的牌"""
            self.cards.sort(key=comp)
    
    
    def main():
        """主函數"""
        poker = Poker()
        poker.shuffle()
        players = [Player('東邪'), Player('西毒'), Player('南帝'), Player('北丐')]
        while poker.has_more:
            for player in players:
                    player.get_one(poker.deal())
        for player in players:
            player.sort()
            print(player.name, end=': ')
            print(player.cards)
    
    
    if __name__ == '__main__':
        main()
    

    說明:上面的程式碼中使用了Emoji字元來表示撲克牌的四種花色,在某些不支援Emoji字元的系統上可能無法顯示。

  • 物件的複製(深複製/深拷貝/深度克隆和淺複製/淺拷貝/影子克隆)

  • 垃圾回收、回圈參照和弱參照

    Python使用了自動化記憶體管理,這種管理機制 機製以參照計數爲基礎,同時也引入了標記-清除分代收集兩種機制 機製爲輔的策略。

    typedef struct_object {
        /* 參照計數 */
        int ob_refcnt;
        /* 物件指針 */
        struct_typeobject *ob_type;
    } PyObject;
    
    /* 增加參照計數的宏定義 */
    #define Py_INCREF(op)   ((op)->ob_refcnt++)
    /* 減少參照計數的宏定義 */
    #define Py_DECREF(op) \ //減少計數
        if (--(op)->ob_refcnt != 0) \
            ; \
        else \
            __Py_Dealloc((PyObject *)(op))
    

    導致參照計數+1的情況:

    • 物件被建立,例如a = 23
    • 物件被參照,例如b = a
    • 物件被作爲參數,傳入到一個函數中,例如f(a)
    • 物件作爲一個元素,儲存在容器中,例如list1 = [a, a]

    導致參照計數-1的情況:

    • 物件的別名被顯式銷燬,例如del a
    • 物件的別名被賦予新的物件,例如a = 24
    • 一個物件離開它的作用域,例如f函數執行完畢時,f函數中的區域性變數(全域性變數不會)
    • 物件所在的容器被銷燬,或從容器中刪除物件

    參照計數可能會導致回圈參照問題,而回圈參照會導致記憶體泄露,如下面 下麪的程式碼所示。爲了解決這個問題,Python中引入了「標記-清除」和「分代收集」。在建立一個物件的時候,物件被放在第一代中,如果在第一代的垃圾檢查中物件存活了下來,該物件就會被放到第二代中,同理在第二代的垃圾檢查中物件存活下來,該物件就會被放到第三代中。

    # 回圈參照會導致記憶體泄露 - Python除了參照技術還引入了標記清理和分代回收
    # 在Python 3.6以前如果重寫__del__魔術方法會導致回圈參照處理失效
    # 如果不想造成回圈參照可以使用弱參照
    list1 = []
    list2 = [] 
    list1.append(list2)
    list2.append(list1)
    

    以下情況會導致垃圾回收:

    • 呼叫gc.collect()
    • gc模組的計數器達到閥值
    • 程式退出

    如果回圈參照中兩個物件都定義了__del__方法,gc模組不會銷燬這些不可達物件,因爲gc模組不知道應該先呼叫哪個物件的__del__方法,這個問題在Python 3.6中得到瞭解決。

    也可以通過weakref模組構造弱參照的方式來解決回圈參照的問題。

  • 魔法屬性和方法(請參考《Python魔法方法指南》)

    有幾個小問題請大家思考:

    • 自定義的物件能不能使用運算子做運算?
    • 自定義的物件能不能放到set中?能去重嗎?
    • 自定義的物件能不能作爲dict的鍵?
    • 自定義的物件能不能使用上下文語法?
  • 混入(Mixin)

    例子:自定義字典限制只有在指定的key不存在時才能 纔能在字典中設定鍵值對。

    class SetOnceMappingMixin:
        """自定義混入類"""
        __slots__ = ()
    
        def __setitem__(self, key, value):
            if key in self:
                raise KeyError(str(key) + ' already set')
            return super().__setitem__(key, value)
    
    
    class SetOnceDict(SetOnceMappingMixin, dict):
        """自定義字典"""
        pass
    
    
    my_dict= SetOnceDict()
    try:
        my_dict['username'] = 'jackfrued'
        my_dict['username'] = 'hellokitty'
    except KeyError:
        pass
    print(my_dict)
    
  • 超程式設計和元類

    物件是通過類建立的,類是通過元類建立的,元類提供了建立類的元資訊。所有的類都直接或間接的繼承自object,所有的元類都直接或間接的繼承自type

    例子:用元類實現單例模式。

    import threading
    
    
    class SingletonMeta(type):
        """自定義元類"""
    
        def __init__(cls, *args, **kwargs):
            cls.__instance = None
            cls.__lock = threading.RLock()
            super().__init__(*args, **kwargs)
    
        def __call__(cls, *args, **kwargs):
            if cls.__instance is None:
                with cls.__lock:
                    if cls.__instance is None:
                        cls.__instance = super().__call__(*args, **kwargs)
            return cls.__instance
    
    
    class President(metaclass=SingletonMeta):
        """總統(單例類)"""
        
        pass
    
  • 物件導向設計原則

    • 單一職責原則 (SRP)- 一個類只做該做的事情(類的設計要高內聚)
    • 開閉原則 (OCP)- 軟體實體應該對擴充套件開發對修改關閉
    • 依賴倒轉原則(DIP)- 面向抽象程式設計(在弱型別語言中已經被弱化)
    • 裡氏替換原則(LSP) - 任何時候可以用子類物件替換掉父類別物件
    • 介面隔離原則(ISP)- 介面要小而專不要大而全(Python中沒有介面的概念)
    • 合成聚合複用原則(CARP) - 優先使用強關聯關係而不是繼承關係複用程式碼
    • 最少知識原則(迪米特法則,LoD)- 不要給沒有必然聯繫的物件發訊息

    說明:上面加粗的字母放在一起稱爲物件導向的SOLID原則。

  • GoF設計模式

    • 建立型模式:單例、工廠、建造者、原型
    • 結構型模式:適配器、門面(外觀)、代理
    • 行爲型模式:迭代器、觀察者、狀態、策略

    例子:可插拔的雜湊演算法(策略模式)。

    class StreamHasher():
        """雜湊摘要生成器"""
    
        def __init__(self, alg='md5', size=4096):
            self.size = size
            alg = alg.lower()
            self.hasher = getattr(__import__('hashlib'), alg.lower())()
    
        def __call__(self, stream):
            return self.to_digest(stream)
    
        def to_digest(self, stream):
            """生成十六進制形式的摘要"""
            for buf in iter(lambda: stream.read(self.size), b''):
                self.hasher.update(buf)
            return self.hasher.hexdigest()
    
    def main():
        """主函數"""
        hasher1 = StreamHasher()
        with open('Python-3.7.6.tgz', 'rb') as stream:
            print(hasher1.to_digest(stream))
        hasher2 = StreamHasher('sha1')
        with open('Python-3.7.6.tgz', 'rb') as stream:
            print(hasher2(stream))
    
    
    if __name__ == '__main__':
        main()
    

迭代器和生成器

  • 迭代器是實現了迭代器協定的物件。

    • Python中沒有像protocolinterface這樣的定義協定的關鍵字。
    • Python中用魔術方法表示協定。
    • __iter____next__魔術方法就是迭代器協定。
    class Fib(object):
        """迭代器"""
        
        def __init__(self, num):
            self.num = num
            self.a, self.b = 0, 1
            self.idx = 0
       
        def __iter__(self):
            return self
    
        def __next__(self):
            if self.idx < self.num:
                self.a, self.b = self.b, self.a + self.b
                self.idx += 1
                return self.a
            raise StopIteration()
    
  • 生成器是語法簡化版的迭代器。

    def fib(num):
        """生成器"""
        a, b = 0, 1
        for _ in range(num):
            a, b = b, a + b
            yield a
    
  • 生成器進化爲協程。

    生成器物件可以使用send()方法發送數據,發送的數據會成爲生成器函數中通過yield表達式獲得的值。這樣,生成器就可以作爲協程使用,協程簡單的說就是可以相互共同作業的子程式。

    def calc_avg():
        """流式計算平均值"""
        total, counter = 0, 0
        avg_value = None
        while True:
            value = yield avg_value
            total, counter = total + value, counter + 1
            avg_value = total / counter
    
    
    gen = calc_avg()
    next(gen)
    print(gen.send(10))
    print(gen.send(20))
    print(gen.send(30))
    

併發程式設計

Python中實現併發程式設計的三種方案:多執行緒、多進程和非同步I/O。併發程式設計的好處在於可以提升程式的執行效率以及改善使用者體驗;壞處在於併發的程式不容易開發和偵錯,同時對其他程式來說它並不友好。

  • 多執行緒:Python中提供了Thread類並輔以LockConditionEventSemaphoreBarrier。Python中有GIL來防止多個執行緒同時執行本地位元組碼,這個鎖對於CPython是必須的,因爲CPython的記憶體管理並不是執行緒安全的,因爲GIL的存在多執行緒並不能發揮CPU的多核特性。

    """
    面試題:進程和執行緒的區別和聯繫?
    進程 - 操作系統分配記憶體的基本單位 - 一個進程可以包含一個或多個執行緒
    執行緒 - 操作系統分配CPU的基本單位
    併發程式設計(concurrent programming)
    1. 提升執行效能 - 讓程式中沒有因果關係的部分可以併發的執行
    2. 改善使用者體驗 - 讓耗時間的操作不會造成程式的假死
    """
    import glob
    import os
    import threading
    
    from PIL import Image
    
    PREFIX = 'thumbnails'
    
    
    def generate_thumbnail(infile, size, format='PNG'):
        """生成指定圖片檔案的縮圖"""
    	file, ext = os.path.splitext(infile)
    	file = file[file.rfind('/') + 1:]
    	outfile = f'{PREFIX}/{file}_{size[0]}_{size[1]}.{ext}'
    	img = Image.open(infile)
    	img.thumbnail(size, Image.ANTIALIAS)
    	img.save(outfile, format)
    
    
    def main():
        """主函數"""
    	if not os.path.exists(PREFIX):
    		os.mkdir(PREFIX)
    	for infile in glob.glob('images/*.png'):
    		for size in (32, 64, 128):
                # 建立並啓動執行緒
    			threading.Thread(
    				target=generate_thumbnail, 
    				args=(infile, (size, size))
    			).start()
    			
    
    if __name__ == '__main__':
    	main()
    

    多個執行緒競爭資源的情況。

    """
    多執行緒程式如果沒有競爭資源處理起來通常也比較簡單
    當多個執行緒競爭臨界資源的時候如果缺乏必要的保護措施就會導致數據錯亂
    說明:臨界資源就是被多個執行緒競爭的資源
    """
    import time
    import threading
    
    from concurrent.futures import ThreadPoolExecutor
    
    
    class Account(object):
        """銀行賬戶"""
    
        def __init__(self):
            self.balance = 0.0
            self.lock = threading.Lock()
    
        def deposit(self, money):
            # 通過鎖保護臨界資源
            with self.lock:
                new_balance = self.balance + money
                time.sleep(0.001)
                self.balance = new_balance
    
    
    class AddMoneyThread(threading.Thread):
        """自定義執行緒類"""
    
        def __init__(self, account, money):
            self.account = account
            self.money = money
            # 自定義執行緒的初始化方法中必須呼叫父類別的初始化方法
            super().__init__()
    
        def run(self):
            # 執行緒啓動之後要執行的操作
            self.account.deposit(self.money)
    
    def main():
        """主函數"""
        account = Account()
        # 建立執行緒池
        pool = ThreadPoolExecutor(max_workers=10)
        futures = []
        for _ in range(100):
            # 建立執行緒的第1種方式
            # threading.Thread(
            #     target=account.deposit, args=(1, )
            # ).start()
            # 建立執行緒的第2種方式
            # AddMoneyThread(account, 1).start()
            # 建立執行緒的第3種方式
            # 呼叫執行緒池中的執行緒來執行特定的任務
            future = pool.submit(account.deposit, 1)
            futures.append(future)
        # 關閉執行緒池
        pool.shutdown()
        for future in futures:
            future.result()
        print(account.balance)
    
    
    if __name__ == '__main__':
        main()
    

    修改上面的程式,啓動5個執行緒向賬戶中存錢,5個執行緒從賬戶中取錢,取錢時如果餘額不足就暫停執行緒進行等待。爲了達到上述目標,需要對存錢和取錢的執行緒進行排程,在餘額不足時取錢的執行緒暫停並釋放鎖,而存錢的執行緒將錢存入後要通知取錢的執行緒,使其從暫停狀態被喚醒。可以使用threading模組的Condition來實現執行緒排程,該物件也是基於鎖來建立的,程式碼如下所示:

    """
    多個執行緒競爭一個資源 - 保護臨界資源 - 鎖(Lock/RLock)
    多個執行緒競爭多個資源(執行緒數>資源數) - 號志(Semaphore)
    多個執行緒的排程 - 暫停執行緒執行/喚醒等待中的執行緒 - Condition
    """
    from concurrent.futures import ThreadPoolExecutor
    from random import randint
    from time import sleep
    
    import threading
    
    
    class Account():
        """銀行賬戶"""
    
        def __init__(self, balance=0):
            self.balance = balance
            lock = threading.Lock()
            self.condition = threading.Condition(lock)
    
        def withdraw(self, money):
            """取錢"""
            with self.condition:
                while money > self.balance:
                    self.condition.wait()
                new_balance = self.balance - money
                sleep(0.001)
                self.balance = new_balance
    
        def deposit(self, money):
            """存錢"""
            with self.condition:
                new_balance = self.balance + money
                sleep(0.001)
                self.balance = new_balance
                self.condition.notify_all()
    
    
    def add_money(account):
        while True:
            money = randint(5, 10)
            account.deposit(money)
            print(threading.current_thread().name, 
                  ':', money, '====>', account.balance)
            sleep(0.5)
    
    
    def sub_money(account):
        while True:
            money = randint(10, 30)
            account.withdraw(money)
            print(threading.current_thread().name, 
                  ':', money, '<====', account.balance)
            sleep(1)
    
    
    def main():
        account = Account()
        with ThreadPoolExecutor(max_workers=10) as pool:
            for _ in range(5):
                pool.submit(add_money, account)
                pool.submit(sub_money, account)
    
    
    if __name__ == '__main__':
        main()
    
  • 多進程:多進程可以有效的解決GIL的問題,實現多進程主要的類是Process,其他輔助的類跟threading模組中的類似,進程間共用數據可以使用管道、通訊端等,在multiprocessing模組中有一個Queue類,它基於管道和鎖機制 機製提供了多個進程共用的佇列。下面 下麪是官方文件上關於多進程和進程池的一個範例。

    """
    多進程和進程池的使用
    多執行緒因爲GIL的存在不能夠發揮CPU的多核特性
    對於計算密集型任務應該考慮使用多進程
    time python3 example22.py
    real    0m11.512s
    user    0m39.319s
    sys     0m0.169s
    使用多進程後實際執行時間爲11.512秒,而使用者時間39.319秒約爲實際執行時間的4倍
    這就證明我們的程式通過多進程使用了CPU的多核特性,而且這台計算機設定了4核的CPU
    """
    import concurrent.futures
    import math
    
    PRIMES = [
        1116281,
        1297337,
        104395303,
        472882027,
        533000389,
        817504243,
        982451653,
        112272535095293,
        112582705942171,
        112272535095293,
        115280095190773,
        115797848077099,
        1099726899285419
    ] * 5
    
    
    def is_prime(n):
        """判斷素數"""
        if n % 2 == 0:
            return False
    
        sqrt_n = int(math.floor(math.sqrt(n)))
        for i in range(3, sqrt_n + 1, 2):
            if n % i == 0:
                return False
        return True
    
    
    def main():
        """主函數"""
        with concurrent.futures.ProcessPoolExecutor() as executor:
            for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
                print('%d is prime: %s' % (number, prime))
    
    
    if __name__ == '__main__':
        main()
    

    重點多執行緒和多進程的比較

    以下情況需要使用多執行緒:

    1. 程式需要維護許多共用的狀態(尤其是可變狀態),Python中的列表、字典、集合都是執行緒安全的,所以使用執行緒而不是進程維護共用狀態的代價相對較小。
    2. 程式會花費大量時間在I/O操作上,沒有太多並行計算的需求且不需佔用太多的記憶體。

    以下情況需要使用多進程:

    1. 程式執行計算密集型任務(如:位元組碼操作、數據處理、科學計算)。
    2. 程式的輸入可以並行的分成塊,並且可以將運算結果合併。
    3. 程式在記憶體使用方面沒有任何限制且不強依賴於I/O操作(如:讀寫檔案、通訊端等)。
  • 非同步處理:從排程程式的任務佇列中挑選任務,該排程程式以交叉的形式執行這些任務,我們並不能保證任務將以某種順序去執行,因爲執行順序取決於佇列中的一項任務是否願意將CPU處理時間讓位給另一項任務。非同步任務通常通過多工共同作業處理的方式來實現,由於執行時間和順序的不確定,因此需要通過回撥式程式設計或者future物件來獲取任務執行的結果。Python 3通過asyncio模組和awaitasync關鍵字(在Python 3.7中正式被列爲關鍵字)來支援非同步處理。

    """
    非同步I/O - async / await
    """
    import asyncio
    
    
    def num_generator(m, n):
        """指定範圍的數位生成器"""
        yield from range(m, n + 1)
    
    
    async def prime_filter(m, n):
        """素數過濾器"""
        primes = []
        for i in num_generator(m, n):
            flag = True
            for j in range(2, int(i ** 0.5 + 1)):
                if i % j == 0:
                    flag = False
                    break
            if flag:
                print('Prime =>', i)
                primes.append(i)
    
            await asyncio.sleep(0.001)
        return tuple(primes)
    
    
    async def square_mapper(m, n):
        """平方對映器"""
        squares = []
        for i in num_generator(m, n):
            print('Square =>', i * i)
            squares.append(i * i)
    
            await asyncio.sleep(0.001)
        return squares
    
    
    def main():
        """主函數"""
        loop = asyncio.get_event_loop()
        future = asyncio.gather(prime_filter(2, 100), square_mapper(1, 100))
        future.add_done_callback(lambda x: print(x.result()))
        loop.run_until_complete(future)
        loop.close()
    
    
    if __name__ == '__main__':
        main()
    

    說明:上面的程式碼使用get_event_loop函數獲得系統預設的事件回圈,通過gather函數可以獲得一個future物件,future物件的add_done_callback可以新增執行完成時的回撥函數,loop物件的run_until_complete方法可以等待通過future物件獲得協程執行結果。

    Python中有一個名爲aiohttp的三方庫,它提供了非同步的HTTP用戶端和伺服器,這個三方庫可以跟asyncio模組一起工作,並提供了對Future物件的支援。Python 3.6中引入了asyncawait來定義非同步執行的函數以及建立非同步上下文,在Python 3.7中它們正式成爲了關鍵字。下面 下麪的程式碼非同步的從5個URL中獲取頁面並通過正則表達式的命名捕獲組提取了網站的標題。

    import asyncio
    import re
    
    import aiohttp
    
    PATTERN = re.compile(r'\<title\>(?P<title>.*)\<\/title\>')
    
    
    async def fetch_page(session, url):
        async with session.get(url, ssl=False) as resp:
            return await resp.text()
    
    
    async def show_title(url):
        async with aiohttp.ClientSession() as session:
            html = await fetch_page(session, url)
            print(PATTERN.search(html).group('title'))
    
    
    def main():
        urls = ('https://www.python.org/',
                'https://git-scm.com/',
                'https://www.jd.com/',
                'https://www.taobao.com/',
                'https://www.douban.com/')
        loop = asyncio.get_event_loop()
        cos = [show_title(url) for url in urls]
        loop.run_until_complete(asyncio.wait(cos))
        loop.close()
    
    
    if __name__ == '__main__':
        main()
    

    重點非同步I/O與多進程的比較

    當程式不需要真正的併發性或並行性,而是更多的依賴於非同步處理和回撥時,asyncio就是一種很好的選擇。如果程式中有大量的等待與休眠時,也應該考慮asyncio,它很適合編寫沒有實時數據處理需求的Web應用伺服器。

    Python還有很多用於處理並行任務的三方庫,例如:joblibPyMP等。實際開發中,要提升系統的可延伸性和併發性通常有垂直擴充套件(增加單個節點的處理能力)和水平擴充套件(將單個節點變成多個節點)兩種做法。可以通過訊息佇列來實現應用程式的解耦合,訊息佇列相當於是多執行緒同步佇列的擴充套件版本,不同機器上的應用程式相當於就是執行緒,而共用的分佈式訊息佇列就是原來程式中的Queue。訊息佇列(訊息導向中介層)的最流行和最標準化的實現是AMQP(高階訊息佇列協定),AMQP源於金融行業,提供了排隊、路由、可靠傳輸、安全等功能,最著名的實現包括:Apache的ActiveMQ、RabbitMQ等。

    要實現任務的非同步化,可以使用名爲Celery的三方庫。Celery是Python編寫的分佈式任務佇列,它使用分佈式訊息進行工作,可以基於RabbitMQ或Redis來作爲後端的訊息代理。