CPU的速度遠遠快於磁碟、網路等IO。在一個執行緒中,CPU執行程式碼的速度極快,然而,一旦遇到IO操作,如讀寫檔案、發送網路數據時,就需要等待IO操作完成,才能 纔能繼續進行下一步操作。這種情況稱爲同步IO。
在IO操作的過程中,當前執行緒被掛起,而其他需要CPU執行的程式碼就無法被當前執行緒執行了。
因爲一個IO操作就阻塞了當前執行緒,導致其他程式碼無法執行,所以我們必須使用多執行緒或者多進程來併發執行程式碼,爲多個使用者服務。每個使用者都會分配一個執行緒,如果遇到IO導致執行緒被掛起,其他使用者的執行緒不受影響。
多執行緒和多進程的模型雖然解決了併發問題,但是系統不能無上限地增加執行緒。由於系統切換執行緒的開銷也很大,所以,一旦執行緒數量過多,CPU的時間就花線上程切換上了,真正執行程式碼的時間就少了,結果導致效能嚴重下降。
由於我們要解決的問題是CPU高速執行能力和IO裝置的龜速嚴重不匹配,多執行緒和多進程只是解決這一問題的一種方法。
另一種解決IO問題的方法是非同步IO。當程式碼需要執行一個耗時的IO操作時,它只發出IO指令,並不等待IO結果,然後就去執行其他程式碼了。一段時間後,當IO返回結果時,再通知CPU進行處理。
可以想象如果按普通順序寫出的程式碼實際上是沒法完成非同步IO的,非同步IO模型需要一個訊息回圈,在訊息回圈中,主執行緒不斷地重複「讀取訊息-處理訊息」這一過程。
在「發出IO請求」到收到「IO完成」的這段時間裏,同步IO模型下,主執行緒只能掛起,但非同步IO模型下,主執行緒並沒有休息,而是在訊息回圈中繼續處理其他訊息。這樣,在非同步IO模型下,一個執行緒就可以同時處理多個IO請求,並且沒有切換執行緒的操作。對於大多數IO密集型的應用程式,使用非同步IO將大大提升系統的多工處理能力。
協程,又稱微執行緒,纖程。英文名Coroutine。
子程式,或者稱爲函數,在所有語言中都是層級呼叫,比如A呼叫B,B在執行過程中又呼叫了C,C執行完畢返回,B執行完畢返回,最後是A執行完畢。
所以子程式呼叫是通過棧實現的,一個執行緒就是執行一個子程式。
子程式呼叫總是一個入口,一次返回,呼叫順序是明確的。而協程的呼叫和子程式不同。
協程看上去也是子程式,但執行過程中,在子程式內部可中斷,然後轉而執行別的子程式,在適當的時候再返回來接着執行。
注意,在一個子程式中中斷,去執行其他子程式,不是函數呼叫,有點類似CPU的中斷。比如子程式A、B,但是在A中是沒有呼叫B的,所以協程的呼叫比函數呼叫理解起來要難一些。
看起來A、B的執行有點像多執行緒,但協程的特點在於是一個執行緒執行,那和多執行緒比,協程有何優勢?
最大的優勢就是協程極高的執行效率。因爲子程式切換不是執行緒切換,而是由程式自身控制,因此,沒有執行緒切換的開銷,和多執行緒比,執行緒數量越多,協程的效能優勢就越明顯。
第二大優勢就是不需要多執行緒的鎖機制 機製,因爲只有一個執行緒,也不存在同時寫變數衝突,在協程中控制共用資源不加鎖,只需要判斷狀態就好了,所以執行效率比多執行緒高很多。
因爲協程是一個執行緒執行,那怎麼利用多核CPU呢?最簡單的方法是多進程+協程,既充分利用多核,又充分發揮協程的高效率,可獲得極高的效能。
Python對協程的支援是通過generator實現的。
生產者生產訊息後,直接通過yield跳轉到消費者開始執行,待消費者執行完畢後,切換回生產者繼續生產,效率極高:
def consumer():
r = ''
while True:
n = yield r
if not n:
return
print('[CONSUMER] Consuming %s...' % n)
r = '200 OK'
def produce(c):
c.send(None)
n = 0
while n < 5:
n = n + 1
print('[PRODUCER] Producing %s...' % n)
r = c.send(n)
print('[PRODUCER] Consumer return: %s' % r)
c.close()
c = consumer()
produce(c)
複製程式碼
執行結果:
[PRODUCER] Producing 1...
[CONSUMER] Consuming 1...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 2...
[CONSUMER] Consuming 2...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 3...
[CONSUMER] Consuming 3...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 4...
[CONSUMER] Consuming 4...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 5...
[CONSUMER] Consuming 5...
[PRODUCER] Consumer return: 200 OK
複製程式碼
注意到consumer函數是一個generator,把一個consumer傳入produce後:
整個流程無鎖,由一個執行緒執行,produce和consumer共同作業完成任務,所以稱爲「協程」,而非執行緒的搶佔式多工。
Python實現非同步IO非常簡單,asyncio是Python 3.4版本引入的標準庫,直接內建了對非同步IO的支援。asyncio
的程式設計模型就是一個訊息回圈。我們從asyncio
模組中直接獲取一個EventLoop
的參照,然後把需要執行的協程扔到EventLoop
中執行,就實現了非同步IO。
用asyncio的非同步網路連線來獲取sina、sohu和163的網站首頁程式碼如下:
import asyncio
@asyncio.coroutine
def wget(host):
print('wget %s...' % host)
connect = asyncio.open_connection(host, 80)
reader, writer = yield from connect
header = 'GET / HTTP/1.0\r\nHost: %s\r\n\r\n' % host
writer.write(header.encode('utf-8'))
yield from writer.drain()
while True:
line = yield from reader.readline()
if line == b'\r\n':
break
print('%s header > %s' % (host, line.decode('utf-8').rstrip()))
# Ignore the body, close the socket
writer.close()
loop = asyncio.get_event_loop()
tasks = [wget(host) for host in ['www.sina.com.cn', 'www.sohu.com', 'www.163.com']]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
複製程式碼
執行結果如下:
wget www.sohu.com...
wget www.sina.com.cn...
wget www.163.com...
(等待一段時間)
(列印出sohu的header)
www.sohu.com header > HTTP/1.1 200 OK
www.sohu.com header > Content-Type: text/html
...
(列印出sina的header)
www.sina.com.cn header > HTTP/1.1 200 OK
www.sina.com.cn header > Date: Wed, 20 May 2015 04:56:33 GMT
...
(列印出163的header)
www.163.com header > HTTP/1.0 302 Moved Temporarily
www.163.com header > Server: Cdn Cache Server V2.0
...
複製程式碼
@asyncio.coroutine
把一個generator標記爲coroutine型別,然後,我們就把這個coroutine
扔到EventLoop
中執行。yield from
語法可以讓我們方便地呼叫另一個generator。所以執行緒不會等待IO操作,而是直接中斷並執行下一個訊息回圈。當yield from
返回時,執行緒就可以從yield from
拿到返回值,然後接着執行下一行語句。
在此期間,主執行緒並未等待,而是去執行EventLoop
中其他可以執行的coroutine
了,因此我們用Task封裝的三個coroutine
可以實現由同一個執行緒併發執行。
爲了簡化並更好地標識非同步IO,從Python 3.5開始引入了新的語法async和await,可以讓coroutine的程式碼更簡潔易讀。
使用新語法,只需要做兩步簡單的替換:
@asyncio.coroutine
替換爲async
;yield from
替換爲await
。用新語法重新編寫上一節的程式碼如下:
import asyncio
async def wget(host):
print('wget %s...' % host)
connect = asyncio.open_connection(host, 80)
reader, writer = await connect
header = 'GET / HTTP/1.0\r\nHost: %s\r\n\r\n' % host
writer.write(header.encode('utf-8'))
await writer.drain()
while True:
line = await reader.readline()
if line == b'\r\n':
break
print('%s header > %s' % (host, line.decode('utf-8').rstrip()))
# Ignore the body, close the socket
writer.close()
loop = asyncio.get_event_loop()
tasks = [wget(host) for host in ['www.sina.com.cn', 'www.sohu.com', 'www.163.com']]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
複製程式碼
剩下的程式碼保持不變。
asyncio
實現了TCP、UDP、SSL等協定,aiohttp
則是基於asyncio
實現的HTTP框架。
安裝aiohttp:
pip install aiohttp
複製程式碼
然後編寫一個HTTP伺服器,分別處理以下URL:
/
- 首頁返回b'<h1>Index</h1>'
;/hello/{name}
- 根據URL參數返迴文字hello, %s!
。程式碼如下:
import asyncio
from aiohttp import web
async def index(request):
await asyncio.sleep(0.5)
return web.Response(body=b'<h1>Index</h1>')
async def hello(request):
await asyncio.sleep(0.5)
text = '<h1>hello, %s!</h1>' % request.match_info['name']
return web.Response(body=text.encode('utf-8'))
async def init(loop):
app = web.Application(loop=loop)
app.router.add_route('GET', '/', index)
app.router.add_route('GET', '/hello/{name}', hello)
srv = await loop.create_server(app.make_handler(), '127.0.0.1', 8000)
print('Server started at http://127.0.0.1:8000...')
return srv
loop = asyncio.get_event_loop()
loop.run_until_complete(init(loop))
loop.run_forever()
複製程式碼
注意aiohttp
的初始化函數init()
也是一個coroutine
,loop.create_server()
則利用asyncio
建立TCP服務。