怎麼用Python寫一個瀏覽器叢集框架

2023-10-27 15:00:52

這是做什麼用的

框架用途

在採集大量新聞網站時,不可避免的遇到動態載入的網站,這給配模版的人增加了很大難度。本來配靜態網站只需要兩個技能點:xpath和正則,如果是動態網站的還得抓包,遇到加密的還得js逆向。

所以就需要用瀏覽器渲染這些動態網站,來減少了配模板的工作難度和技能要求。動態載入的網站在新聞網站裡佔比很低,需要的硬體資源相對於一個人工來說更便宜。

實現方式

採集框架使用瀏覽器渲染有兩種方式,一種是直接整合到框架,類似GerapyPyppeteer,這個專案你看下原始碼就會發現寫的很粗糙,它把瀏覽器放在_process_request方法裡啟動,然後採集完一個連結再關閉瀏覽器,大部分時間都浪費在瀏覽器的啟動和關閉上,而且採集多個連結會開啟多個瀏覽器搶佔資源。

另一種則是將瀏覽器渲染獨立成一個服務,類似scrapy-splash,這種方式比直接整合要好,本來就是兩個不同的功能,實際就應該解耦成兩個單獨的模組。不過聽前輩說這東西不太好用,會有記憶體漏失的情況,我就沒測試它。

自己實現

原理:在自動化瀏覽器中嵌入http服務實現http控制瀏覽器。這裡我選擇aiohttp+pyppeteer。之前看到有大佬使用go的rod來做,奈何自己不會go語言,還是用Python比較順手。

後面會考慮用playwright重寫一遍,pyppeteer的github說此倉庫不常維護了,建議使用playwright。

開始寫程式碼

web服務

from aiohttp import web

app = web.Application()
app.router.add_view('/render.html', RenderHtmlView)
app.router.add_view('/render.png', RenderPngView)
app.router.add_view('/render.jpeg', RenderJpegView)
app.router.add_view('/render.json', RenderJsonView)

然後在RenderHtmlView類中寫/render.html請求的邏輯。/render.json是用於獲取網頁的某個ajax介面響應內容。有些情況網頁可能不方便解析,想拿到介面的json響應資料。

初始化瀏覽器

瀏覽器只需要初始化一次,所以啟動放到on_startup,關閉放到on_cleanup

c = LaunchChrome()
app.on_startup.append(c.on_startup_tasks)
app.on_cleanup.append(c.on_cleanup_tasks)

其中on_startup_tasks和on_cleanup_tasks方法如下:

async def on_startup_tasks(self, app: web.Application) -> None:
		page_count = 4
		await asyncio.create_task(self._launch())
		app["browser"] = self.browser
		tasks = [asyncio.create_task(self.launch_tab()) for _ in range(page_count-1)]
		await asyncio.gather(*tasks)
		queue = asyncio.Queue(maxsize=page_count+1)
		for i in await self.browser.pages():
				await queue.put(i)
		app["pages_queue"] = queue
		app["screenshot_lock"] = asyncio.Lock()

async def on_cleanup_tasks(self, app: web.Application) -> None:
		await self.browser.close()

page_count為初始化的分頁數,這種常數一般定義到組態檔裡,這裡我圖方便就不寫組態檔了。

首先初始化所有的分頁放到佇列裡,然後存放在app這個物件裡,這個物件可以在RenderHtmlView類裡通過self.request.app存取到, 到時候就能控制使用哪個分頁來存取連結

我還初始化了一個協程鎖,後面在RenderPngView類裡截圖的時候會用到,因為多標籤不能同時截圖,需要加鎖。

超時停止頁面繼續載入

async def _goto(self, page: Optional[Page], options: AjaxPostData) -> Dict:
		try:
				await page.goto(options.url, 
						waitUntil=options.wait_util, timeout=options.timeout*1000)
		except PPTimeoutError:
				#await page.evaluate('() => window.stop()')
				await page._client.send("Page.stopLoading")
		finally:
				page.remove_all_listeners("request")

有時間頁面明明載入出來了,但還在轉圈,因為某個圖片或css等資源存取不到,強制停止載入也不會影響到網頁的內容。

Page.stopLoading和window.stop()都可以停止頁面繼續載入,忘了之前為什麼選擇前者了

定義請求引數

class HtmlPostData(BaseModel):
    url: str
    timeout: float = 30
    wait_util: str = "domcontentloaded"
    wait: float = 0   
    js_name: str = "" 
    filters: List[str] = [] 
    images: bool = 0  
    forbidden_content_types: List[str] = ["image", "media"]
    cache: bool = 1 
    cookie: bool = 0 
    text: bool = 1 
		headers: bool = 1
  • url: 存取的連結
  • timeout: 超時時間
  • wait_util: 頁面載入完成的標識,一般都是domcontentloaded,只有截圖的時候會選擇networkidle2,讓網頁載入全一點。更多的選項的選項請看:Puppeteer waitUntil Options
  • wait: 頁面載入完成後等待的時間,有時候還得等頁面的某個元素載入完成
  • js_name: 預留的引數,用於在頁面存取前載入js,目前就只有一個js(stealth.min.js)用於去瀏覽器特徵
  • filters: 過濾的請求列表, 支援正則。比如有些css請求你不想讓他載入
  • images: 是否載入圖片
  • forbidden_content_types: 禁止載入的資源型別,預設是圖片和視訊。所有的型別見: resourcetype
  • cache: 是否啟用快取
  • cookie: 是否在返回結果裡包含cookie
  • text: 是否在返回結果裡包含html
  • headers: 是否在返回結果裡包含headers

圖片的引數

class PngPostData(HtmlPostData):
    render_all: int = 0
    text: bool = 0
    images: bool = 1
    forbidden_content_types: List[str] = []
    wait_util: str = "networkidle2"

引數和html的基本一樣,增加了一個render_all用於是否擷取整個頁面。截圖的時候一般是需要載入圖片的,所以就啟用了圖片載入

怎麼使用

多個標籤同時採集

預設是啟動了四個分頁,這四個分頁可以同時存取不同連結。如果分頁過多可能會影響效能,不過開了二三十個應該沒什麼問題

請求例子如下:

import sys
import asyncio
import aiohttp

if sys.platform == 'win32':
    asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())

async def get_sign(session, delay):
    url = f"http://www.httpbin.org/delay/{delay}"
    api = f'http://127.0.0.1:8080/render.html?url={url}'
    async with session.get(api) as resp:
        data = await resp.json()
        print(url, data.get("status"))
        return data

async def main():
    headers = {
        "Content-Type": "application/json",
        'user-agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/86.0.4240.198 Safari/537.36',
    }
    loop = asyncio.get_event_loop()
    t = loop.time()
    async with aiohttp.ClientSession(headers=headers) as session:
        tasks = [asyncio.create_task(get_sign(session, i)) for i in range(1, 5)]
        await asyncio.gather(*tasks)
    print("耗時: ", loop.time()-t)

        
if __name__ == "__main__":
    asyncio.run(main())

http://www.httpbin.org/delay後面跟的數位是多少,網站就會多少秒後返回。所以如果同步執行的話至少需要1+2+3+4秒,而多分頁非同步執行的話至少需要4秒

結果如圖,四個連結只用了4秒多點:

攔截指定ajax請求的響應

import json
import sys
import asyncio
import aiohttp

if sys.platform == 'win32':
    asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())

async def get_sign(session, url):
    api = f'http://127.0.0.1:8080/render.json'
    data = {
        "url": url,
        "xhr": "/api/", # 攔截介面包含/api/的響應並返回
        "cache": 0,
        "filters": [".png", ".jpg"]
    }
    async with session.post(api, data=json.dumps(data)) as resp:
        data = await resp.json()
        print(url, data)
        return data

async def main():
    urls = ["https://spa1.scrape.center/"]
    headers = {
        "Content-Type": "application/json",
        'user-agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/86.0.4240.198 Safari/537.36',
    }
    loop = asyncio.get_event_loop()
    t = loop.time()
    async with aiohttp.ClientSession(headers=headers) as session:
        tasks = [asyncio.create_task(get_sign(session, url)) for url in urls]
        await asyncio.gather(*tasks)
    print(loop.time()-t)

        
if __name__ == "__main__":
    asyncio.run(main())

請求https://spa1.scrape.center/這個網站並獲取ajax連結中包含/api/的介面響應資料,結果如圖:

請求一個網站用時21秒,這是因為網站一直在轉圈,其實要的資料已經載入完成了,可能是一些圖示或者css還在請求。

超時強制返回

加上timeout引數後,即使頁面未載入完成也會強制停止並返回資料。如果這個時候已經攔截到了ajax請求會返回ajax響應內容,不然就是返回空

不過好像因為有快取,現在時間不到1秒就返回了

截圖

import json
import sys
import asyncio
import base64
import aiohttp

if sys.platform == 'win32':
    asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())

async def get_sign(session, url, name):
    api = f'http://127.0.0.1:8080/render.png'
    data = {
        "url": url,
        #"render_all": 1,
        "images": 1,
        "cache": 1,
        "wait": 1 
    }
    async with session.post(api, data=json.dumps(data)) as resp:
        data = await resp.json()
        if data.get('image'):
            image_bytes = base64.b64decode(data["image"])
            with open(name, 'wb') as f:
                f.write(image_bytes)
            print(url, name, len(image_bytes))
        return data

async def main():
    urls = [
        "https://www.baidu.com/s?ie=utf-8&f=8&rsv_bp=1&tn=44004473_102_oem_dg&wd=%E5%9B%BE%E7%89%87&rn=50",
        "https://www.toutiao.com/article/7145668657396564518/",
        "https://new.qq.com/rain/a/NEW2022092100053400",
        "https://new.qq.com/rain/a/DSG2022092100053300"
    ]
    headers = {
        "Content-Type": "application/json",
        'user-agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/86.0.4240.198 Safari/537.36',
    }
    loop = asyncio.get_event_loop()
    t = loop.time()
    async with aiohttp.ClientSession(headers=headers) as session:
        tasks = [asyncio.create_task(get_sign(session, url, f"{n}.png")) for n,url in enumerate(urls)]
        await asyncio.gather(*tasks)
    print(loop.time()-t)


if __name__ == "__main__":
    asyncio.run(main())

整合到scrapy

import json
import logging
from scrapy.exceptions import NotConfigured

logger = logging.getLogger(__name__)

class BrowserMiddleware(object):
    def __init__(self, browser_base_url: str):
        self.browser_base_url = browser_base_url
        self.logger = logger
        
    @classmethod
    def from_crawler(cls, crawler):
        s = crawler.settings
        browser_base_url = s.get('PYPPETEER_CLUSTER_URL')
        if not browser_base_url:
            raise NotConfigured
        o = cls(browser_base_url)
        return o
    
    def process_request(self, request, spider):
        if "browser_options" not in request.meta or request.method != "GET":
            return
        browser_options = request.meta["browser_options"]
        url = request.url
        browser_options["url"] = url
        uri = browser_options.get('browser_uri', "/render.html")
        browser_url = self.browser_base_url.rstrip('/') + '/' + uri.lstrip('/')
        new_request = request.replace(
            url=browser_url,
            method='POST',
            body=json.dumps(browser_options)
        )
        new_request.meta["ori_url"] = url
        return new_request

    def process_response(self, request, response, spider):
        if "browser_options" not in request.meta or "ori_url" not in request.meta:
            return response
        try:
            datas = json.loads(response.text)
        except json.decoder.JSONDecodeError:
            return response.replace(url=url, status=500)
        datas = self.deal_datas(datas)
        url = request.meta["ori_url"]
        new_response = response.replace(url=url, **datas)
        return new_response
    
    def deal_datas(self, datas: dict) -> dict:
        status = datas["status"]
        text: str = datas.get('text') or datas.get('content')
        headers = datas.get('headers')
        response = {
            "status": status,
            "headers": headers,
            "body": text.encode()
        }
        return response            

開始想用aiohttp來請求,後面想了下,其實都要替換請求和響應,為什麼不直接用scrapy的下載器

完整原始碼

現在還只是個半成品玩具,還沒有用於實際生產中,叢集打包也沒做。有興趣的話可以自己完善一下

如果感興趣的人比較多,後面也會系統的完善一下,打包成docker和釋出第三方庫到pypi

github:https://github.com/kanadeblisst00/browser_cluster