Python數據抓取——多執行緒,非同步

2020-08-12 17:35:03

本文主要是爲了加快數據抓取任務,考慮使用多進程、多執行緒、非同步原理,相關概念可以參考 
https://www.liaoxuefeng.com/wiki/001374738125095c955c1e6d8bb493182103fac9270762a000/0013868322563729e03f6905ea94f0195528e3647887415000

操作系統可以同時執行多個任務。首先,考慮單核CPU是如何執行多工的:操作系統輪流讓各個任務交替執行,任務1執行0.01秒,切換到任務2,任務2執行0.01秒,再切換到任務3,執行0.01秒……這樣反覆 反復執行下去。表面上看,每個任務都是交替執行的,但是,由於CPU的執行速度非常快,給人的感覺就像所有任務都在同時執行一樣。真正的並行執行多工只能在多核CPU上實現,但是,由於任務數量遠遠多於CPU的核心數量,所以,操作系統也會自動把很多工輪流排程到每個核心上執行

對於操作系統來說,一個任務就是一個進程(Process),比如開啓一個瀏覽器就是啓動一個瀏覽器進程,開啓一個記事本就啓動了一個記事本進程,開啓兩個記事本就啓動了兩個記事本進程,開啓一個Word就啓動了一個Word進程。有些進程還不止同時幹一件事,比如Word,它可以同時進行打字、拼寫檢查、列印等事情。在一個進程內部,要同時幹多件事,就需要同時執行多個「子任務」,我們把進程內的這些「子任務」稱爲執行緒(Thread)。由於每個進程至少要幹一件事,所以一個進程至少有一個執行緒多執行緒的執行方式和多進程是一樣的,也是由操作系統在多個執行緒之間快速切換,讓每個執行緒都短暫地交替執行,看起來就像同時執行一樣,真正能同時執行多執行緒需要多核CPU纔可能實現

我們前面編寫的所有的Python程式,都是執行單任務的進程,也就是隻有一個執行緒。如果要同時執行多個任務有3種方案:一種是啓動多個進程,每個進程只開一個執行緒,但多個進程可以一塊執行多個任務。還有一種方法是啓動一個進程,在一個進程內啓動多個執行緒,多個執行緒也可以一塊執行多個任務。第三種方法,就是啓動多個進程,每個進程再啓動多個執行緒,這樣同時執行的任務就更多了,這種模型很複雜,實際很少採用。多工的實現有3種方式:多進程模式;多執行緒模式;多進程+多執行緒模式。同時執行多個任務通常各個任務之間需要相互通訊和協調,有時,任務1必須暫停等待任務2完成後才能 纔能繼續執行,有時,任務3和任務4又不能同時執行,所以,多進程和多執行緒的程式的複雜度要遠遠高於我們前面寫的單進程單執行緒的程式。因爲複雜度高,偵錯困難,所以,不是迫不得已,我們也不想編寫多工。但是,有很多時候,沒有多工還真不行。想想在電腦上看電影,就必須由一個執行緒播放視訊,另一個執行緒播放音訊,否則,單執行緒實現的話就只能先把視訊播放完再播放音訊,或者先把音訊播放完再播放視訊,這顯然是不行的。

Python既支援多進程,又支援多執行緒。多工可以由多進程完成,也可以由一個進程內的多執行緒完成。進程是由若幹線程組成的,一個進程至少有一個執行緒。由於執行緒是操作系統直接支援的執行單元,因此,高階語言通常都內建多執行緒的支援,Python也不例外,並且,Python的執行緒是真正的Posix Thread,而不是模擬出來的執行緒。Python的標準庫提供了兩個模組:thread和threading,thread是低階模組,threading是高階模組,對thread進行了封裝。絕大多數情況下,我們只需要使用threading這個高階模組。啓動一個執行緒就是把一個函數傳入並建立Thread範例,然後呼叫start()開始執行。

import requests
import threading

def get_stock(code):
    url = 'http://hq.sinajs.cn/list=' + code
    resp = requests.get(url)
    print('%s\n' % resp.text)

#多執行緒非同步,加速抓取
#根據有幾個股票程式碼,就建立幾個執行緒
codes = ['sz000878', 'sh600993', 'sz000002', 'sz002230']
threads = [threading.Thread(target=get_stock, args=(code, )) for code in codes]
#Thread建立執行緒範例
'''
threads=[ ]
for code in codes:
    thread=threading.Thread(target=get_stock,args=(code, ))
    threads.append(thread)
'''
for t in threads:
    t.start()  #啓動一個執行緒
for t in threads:
    t.join()  #等待每個執行緒執行結束

这里写图片描述

多工用執行緒池自動排程

import requests
import threadpool  #執行緒池

def get_stock(code):
    url = 'http://hq.sinajs.cn/list=' + code
    resp = requests.get(url)
    print('%s\n' % resp.text)

codes = ['sz000878', 'sh600993', 'sz000002', 'sz002230']
#codes裡任務很多,比如幾百個,讓pool自己去排程
pool = threadpool.ThreadPool(2) #執行緒池設定,最多同時跑兩個執行緒
tasks = threadpool.makeRequests(get_stock, codes)
#makeRequests構造執行緒task請求,第一個參數是執行緒函數,第二個是參數陣列
[pool.putRequest(task) for task in tasks]
#列表推導式,putRequest向執行緒池裏加task,讓pool自己去排程task
pool.wait() #等所有任務結束

这里写图片描述

非同步 
交出當前CPU的控制權,最大化利用當前單個CPU的效率

import aiohttp #表示http請求是非同步方式去請求的
import asyncio #當非同步請求返回時,通知非同步操作完成

#非同步可以參考grequests庫的使用:https://github.com/kennethreitz/grequests
async def get_stock(code):
#關鍵字async表示請求是非同步的
    url = 'http://hq.sinajs.cn/list=' + code
    resp = await aiohttp.request('GET', url) # yield
    #await表示任務等待時,不佔用CPU資源,通知請求返回
    body = await resp.read()
    #表示從網路上把請求的東西都讀回來
    text = body.decode('gb2312') #對讀回來的原始位元組解碼
    print(text)
    resp.close()

codes = ['sz000878', 'sh600993', 'sz000002', 'sz002230']
tasks = [get_stock(code) for code in codes]
#由於是非同步請求,這裏get_stock(code)並不會被馬上執行,只是佔用了一個位置

loop = asyncio.get_event_loop()  #loop的作用是——做完任務,事件通知
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

#tasks生成一組併發的非同步任務,loop表示非同步作用完成後等待通知