最近把Python寫的資料採集平臺往.Net Core上遷移,原本的採集任務使用多程序+執行緒池的方式來加快採集速度,使用Celery作為非同步任務佇列兼具定時任務功能,這套東西用著還行,但反正就折騰嘛,直接上C#~
本文記錄 Hangfire 在實際應用裡的用法,我發現網路上找到的大部分文章都是用 Hangfire 的非同步任務輸出個 Hello World,然後就沒了。我實在不知道這樣的文章寫了有什麼意義??除了浪費看的人的時間之外,還浪費自己寫文章的時間……
.NetCore作為一個高效能的平臺,自然不可能輸給Python,不過我不想造輪子了(菜),找個現成的方案來用,免得踩坑~
先是調研了一下.NetCore目前的生態,發現有幾個選擇:
第一個 FreeScheduler 和 FreeSQL 專案出自同一個作者之手,剛好我的專案也是用的 FreeSQL 作為 ORM,不過看了一下Github上stars比較少,而且檔案暫時還不完善,我最關心的依賴注入功能暫時還不好搞,於是只好作罷。
然後在 Quartz.net 和 Hangfire 兩者中,我選擇了後者,原因是我之前做 CrawlCenterNet 專案的時候用過 Hangfire,還挺好用的,且帶有一個簡單的 dashboard,比較直觀~
那麼就開始吧~
這裡的後端指的是任務佇列的儲存後端,也就是 Hangfire 檔案中寫的 Storage。
看了一下官網,大部分關係型非關係型資料庫都是可以的,那我就放心了。由於目前生產環境在使用 Oracle 資料庫,所以一開始我選擇了 Oracle 作為 Storage,但是在同時開到2000多個任務的時候報錯了,看了下原因是表空間不足,是 Oracle 的問題…
所以為了效能和穩定性,我棄坑了,接著嘗試了 SQLite (僅作為本地測試),結果發現設定裡面定義了 Connection String 但它不理我,直接把這個 Connection String 作為資料庫的名稱了,無語… 這樣就沒辦法把 SQLite 資料庫設定為非同步模式,那速度就直接烏龜爬了…
再次棄坑… 這次直接上 Redis 了,為了提高效能,捨棄持久化能力~ Redis也沒讓我失望,幾千個任務壓根不帶眨眼的,nice~
Hangfire 元件不是一開始就引入的,這裡先上最基本的資料採集程式碼,後面的介紹才能更清楚~
關鍵的程式碼在 Services/CrawlService.cs
檔案中
public class CrawlService {
// 依賴注入一些服務
private readonly IBaseRepository<Proc> _repo;
public async Task CrawlAllProc() {
for(var i=1; i<2000; i++) {
await CrawlProcList(i);
}
}
public async Task CrawlProcList(int page) {
// 具體程式碼省略了
var procList = ; //...
foreach (var proc in procList) {
await CrawlProc(proc);
}
}
public async Task CrawlProc(Proc proc) { }
}
然後,當啟動採集任務的時候,直接去呼叫 CrawlAllProc
方法,這樣就開始一頁一頁採集,每頁又有很多的 Proc
資料,全都採集下來。
上面的程式碼用的是 await
,會等待非同步方法完成,速度很慢,去掉 await
,在新執行緒中執行任務,不等待其結束,不過問題也很明顯,如果出錯了很難偵錯,這樣就不好保證系統的穩定性。
接下來我們用 Hangfire 來改造。
本專案用到了以下依賴:
直接 nuget 一把梭完事
為了跟後面的內容區分,這裡先來官方的例子
註冊服務:
services.AddHangfire(
configuration => configuration
.SetDataCompatibilityLevel(CompatibilityLevel.Version_170)
.UseSimpleAssemblyNameTypeSerializer()
.UseRecommendedSerializerSettings()
// 根據實際使用的 Storage 來註冊
.UseRedisStorage();
services.AddHangfireServer();
新增中介軟體:
app.UseHangfireDashboard();
Hangfire 註冊的時候預設是單例模式,所以在任意程式碼中使用其靜態方法就能新增非同步任務或者定時任務。
非同步任務:
BackgroundJobs.Enqueue(() => Console.WriteLine("Hello world!"));
定時任務:
Hangfire的定時任務叫做 recurrent tasks,我之前一般習慣叫 scheduled task,一開始差點找不到檔案~
以下程式碼新增一個每天執行一次的任務,如果需要其他時間,可以自定義後面的 Cron 引數,具體自行研究 Cron 語法~
RecurringJob.AddOrUpdate("easyjob", () => Console.Write("Easy!"), Cron.Daily);
OK,終於進到正文
正如我一開始說的,前面介紹的用法是遠遠不夠的,如果只是介紹個 Hello World,那也沒必要專門寫篇文章了…
現在開始介紹如何將 Hangfire 結合我的 CrawlService
使用
因為 CrawlService
需要運算元據庫,所以是用到了依賴注入的,所以我們需要讓 Hangfire 也支援依賴注入。
官方檔案有一小節是關於 IOC 容器的(https://docs.hangfire.io/en/latest/background-methods/using-ioc-containers.html),不過並沒有介紹 AspNetCore 的容器,直接自己動手 豐衣足食咯~
一開始搜了好久沒找到,最終是在Github上找到一個例子程式碼,裡面的 AspNetCore 版本好老,居然是1.1版本,我都沒用過… 不過並不影響我節儉他的寫法~
這一步需要 JobActivator
的子類
來寫一個,我把它放在 Infrastructure
目錄下
using Hangfire;
public class HangfireActivator : JobActivator {
private readonly IServiceProvider _serviceProvider;
public HangfireActivator(IServiceProvider serviceProvider) {
_serviceProvider = serviceProvider;
}
public override object? ActivateJob(Type jobType) {
return _serviceProvider.GetService(jobType);
}
}
這裡是在 HangfireActivator
的建構函式中把 AspNetCore 的 IOC 容器物件傳入,並且重寫 ActivateJob
方法,讓 Hangfire 才啟用任務的時候從 IOC 容器中獲取物件,比較好理解。
其實服務註冊部分是一樣的,無須修改
不過按照習慣,為了使 Program.cs
或者 Startup.cs
程式碼比較簡潔,我還是寫了擴充套件方法來實現這部分。
在 Extensions
目錄中新增 ConfigureHangfire.cs
public static class ConfigureHangfire {
public static void AddHangfirePkg(this IServiceCollection services, IConfiguration configuration) {
services.AddHangfire(conf => conf
.SetDataCompatibilityLevel(CompatibilityLevel.Version_170)
.UseSimpleAssemblyNameTypeSerializer()
.UseRecommendedSerializerSettings()
.UseRedisStorage()
);
services.AddHangfireServer();
}
public static void UseHangfire(this WebApplication app) {
GlobalConfiguration.Configuration.UseActivator(new HangfireActivator(app.Services));
app.UseHangfireDashboard();
}
}
可以看到有修改的地方就是在新增中介軟體之前,設定了 Activator
這行程式碼:
GlobalConfiguration.Configuration.UseActivator(new HangfireActivator(app.Services));
直接把 IOC 容器傳入
搞定~
接著在 Program.cs
(我用的 .Net6)中使用這個擴充套件方法就完事了~
builder.Services.AddHangfirePkg(builder.Configuration);
// 中介軟體
app.UseHangfire();
有了依賴注入之後,建立非同步任務是這樣的。也就是多了個泛型引數。
BackgroundJob.Enqueue<CrawlService>(a => a.CrawlAllProc());
定時任務,每小時執行一次
RecurringJob.AddOrUpdate<CrawlService>(a => a.CrawlAllProc(), Cron.Hourly);
OK,最後回到一開始的資料採集程式碼,做如下修改:
public class CrawlService {
// 依賴注入一些服務
private readonly IBaseRepository<Proc> _repo;
public async Task CrawlAllProc() {
for(var i=1; i<2000; i++) {
// await CrawlProcList(i);
BackgroundJob.Enqueue(() => CrawlProcList(i, 100));
}
}
public async Task CrawlProcList(int page, int pageSize = 100) {
// 具體程式碼省略了
var procList = ; //...
foreach (var proc in procList) {
// await CrawlProc(proc);
BackgroundJob.Enqueue(() => CrawlProc(proc));
}
}
public async Task CrawlProc(Proc proc) { }
}
把原來 await
的地方註釋掉,換成用 Hangfire 建立非同步任務,執行起來,開啟dashboard,可以看到任務噌的一下就上到幾千,速度極快~
需要注意的就是
CrawlProcList
方法的第二個引數pageSize
我們給了預設值100,在正常使用是沒問題的,可以不傳入這個引數,預設就是100。但
BackgroundJob.Enqueue
方法裡不能省略這個引數,不然會報錯說編譯器無法解析啥的,這個應該是C#的語言限制,具體我暫時還沒去深入研究。
OK,這樣就初步搞定了資料採集 & 定時採集的功能,這部分剛好是我國慶第一天加班完成的,後續的就交給時間吧~ 國慶剩下幾天的假期讓它跑個夠,等假期結束再回去看看效果如何,到時有新的進展我也會及時更新部落格。
對了,我還打算封裝個非同步任務和定時任務的介面(似乎 AspNetCore 沒有這部分功能?),因為我不想程式碼和 Hangfire 有太高的耦合,封裝成抽象的介面,以後如果換別的元件也沒有壓力。
就把這件事先加入 todo list 吧~