.NET與巨量資料

2023-02-03 21:01:49

前言

當別人做巨量資料用Java、Python的時候,我使用.NET做巨量資料、資料探勘,這確實是值得一說的事。
寫的並不全面,但都是實際工作中的內容。

.NET在巨量資料專案中,可以做什麼?

  1. 寫指令碼(使用控制檯程式+頂級語句)
  2. 寫工具(使用Winform)
  3. 寫介面、寫服務

使用C#寫程式碼的優點是什麼?

  1. 靜態型別+匿名型別,一次性使用的實體類就用匿名型別,多次或多個地方使用的實體類就用靜態型別,靜態型別優於Python,匿名型別優於Java。你是不是想說Python也有靜態型別?你倒是寫啊?!
  2. 程式碼的可維護性好,這是相對於Python說的,不一定是語言的鍋,還有固有的程式碼組織習慣,靜態型別本身就是很好的註釋
  3. 效能好,非同步並行的程式碼易編寫。
    想起來一個事,就是前同事用Python2做資料探勘,先用的es,效能差,改用的clickhouse,我就納悶,es效能差?現在我想我明白了,我看了其中一個挖掘演演算法,它需要在雙層迴圈遍歷中去請求es進行查詢,它沒有使用非同步,也沒有使用多執行緒,那不就是一個執行緒在查詢嗎?我們現網es伺服器設定這麼強這麼多,它居然用一個執行緒去同步請求,能快才怪!實際上一個請求耗時極短,因為es有各種快取,而查詢條件精確可以命中快取,所以可以並行請求多個es節點。
    那前同事為什麼沒有使用非同步並行或多執行緒呢?Python2不支援嗎?或者Python2支援,但寫起來不方便?或者前同事不會寫?(原因:寫起來不方便,C#一樣也不太方便,而且會使整個程式的並行請求量變得難以控制,可以針對某個介面單獨優化,但所有介面都這樣寫,也挺麻煩的)

使用.NET開發的優點是什麼?

其中一個優點是應用程式型別豐富,目前我用到的應用程式型別有:

  1. 控制檯
  2. Winform
  3. Web API
  4. Blazor
    你是不是想說Java和Python也可以寫控制檯、表單程式、Web API?一個熟悉Ptyhon的程式設計師,可不一定會寫表單程式,需要一點時間學習,一個做了幾年.NET的程式設計師天然會寫Winform,就是拖控制元件啊。當然,也可能他們不用Windows。
    每一種應用程式型別,都意味著學習成本,而這些我已經會了,時間就省下了(Blazor一開始不會,學習花了一兩天)。

.NET與ClickHouse

我寫了一個大雜燴指令碼專案,裡面有很多工程是查詢ClickHouse統計分析,程式碼流程就是讀取Excel資料作為查詢輸入條件,查詢ClickHouse統計分析,統計結果匯出到Excel。一個統計分析工作任務小半天就完成了。
用的ORM是我自己寫的Dapper.LiteSql。沒什麼人用,可能是功能不強吧。不過很適合我自己的需求,我自己經常用。
比如:

int count = session.CreateSql<XXX>(@"
    select count(distinct t.xxx, t.xxx, t.xxx) as cnt
    from xxx t
")
.Where(t => t.PassTime >= startTime && t.PassTime <= endTime)
.Where("t.Name in (" + kkNames + ")")
.QuerySingle<int>();

再比如:

var query = session.CreateSql<XXX>(@"
        select t.xxx, t.xxx, t.xxx
        from xxx t
    ")
    .Where(t => t.PassTime >= firstTime && t.PassTime <= firstTime.AddDays(7).AddSeconds(-1));
query.Where(t => plateList.Skip((page - 1) * pageSize).Take(pageSize).ToList().Contains(t.PlateNo));
var temp = query.ToList();

對於統計查詢,我經常SQL和Lambda表示式混寫,感覺這樣非常靈活。
某些情況下,混寫比純Lambda寫法,是要清晰的:

List<XXX> list = session.CreateSql<XXX>(@"
    select xxx, xxx as xxx, max(xxx) as xxx
    from (
    select xxx, toDate(xxx) as xxx, xxx, count(*) as xxx
    from (
    select distinct t.xxx, t.xxx, t.xxx
    from xxx t
").Where(t => t.Xxx != "xxx")
.Where(t => t.XxxTime >= startTime && t.XxxTime <= endTime)
.Where(t => xxxList.Contains(t.Xxx))
.Where(@"(
    (formatDateTime(t.xxx_time ,'%H:%M:%S') >= '07:00:00' and formatDateTime(t.xxx_time ,'%H:%M:%S') <= '08:59:59') or
    (formatDateTime(t.xxx_time ,'%H:%M:%S') >= '14:00:00' and formatDateTime(t.xxx_time ,'%H:%M:%S') <= '20:59:59')
)")
.Append(@")")
.GroupBy("xxx, xxx, xxx")
.Append(@") 
    group by xxx, xxx
")
.QueryList<XXX>();

上述程式碼說明:

  1. group by寫了兩種寫法比較隨意
  2. 三層select巢狀,當然主流ORM都能實現,但不一定易編寫、易閱讀
  3. 我不用針對ClickHouse去實現formatDateTime,也不用實現toDate、max、distinct、count,也不用糾結是count(*)還是count(1),只要實現的功能足夠少,BUG就少。

.NET與ElasticSearch

本打算使用Elasticsearch.Net,為什麼沒有使用?

  1. 學習成本,專案中沒有學習時間,雖然造測試資料是本職工作,但寫小工具不是本職工作不能耽誤太多時間,所以沒有學習時間
  2. 我使用HttpClient查詢es,這種查詢es的方式和kibana中寫的查詢語句、以及前同事留下的建立索引的檔案、模板最接近,方便抄現成的。下面是一個完整的查詢es方法:
public async Task<TicketAgg> QueryAgg(string strStartTime, string strEndTime, string idCard)
{
    Stopwatch sw = Stopwatch.StartNew();

    string esUrl = $"http://{esIPs[_rnd.Next(0, esIPs.Length)]}:24100/out_xxx/_search";

    var esQueryBody = new
    {
        size = 0,
        query = new
        {
            @bool = new
            {
                must = new dynamic[]
                {
                    new
                    {
                        range = new
                        {
                            travel_time = new
                            {
                                gte = strStartTime,
                                lte = strEndTime,
                                format = "yyyyMMddHHmmss"
                            }
                        }
                    },
                    new
                    {
                        match_phrase = new
                        {
                            zjhm = idCard
                        }
                    }
                }
            }
        },
        aggs = new
        {
            countByZjhm = new
            {
                terms = new
                {
                    field = "zjhm",
                    size = 10000
                }
            }
        }
    };

    string esPostData = JsonConvert.SerializeObject(esQueryBody);
    Console.WriteLine($"ES請求URL:{esUrl}");
    Console.WriteLine($"ES請求引數:{esPostData}");
    HttpClient httpClient = HttpClientFactory.GetClient();
    HttpContent content = new StringContent(esPostData);
    content.Headers.ContentType = new System.Net.Http.Headers.MediaTypeHeaderValue("application/json");
    string strEsResult = await (await httpClient.PostAsync(esUrl, content)).Content.ReadAsStringAsync();
    var resultObj = new
    {
        took = 0,
        aggregations = new
        {
            countByZjhm = new
            {
                buckets = new[]
                {
                    new
                    {
                        key = "",
                        doc_count = 0
                    }
                }
            }
        }
    };
    var esResult = JsonConvert.DeserializeAnonymousType(strEsResult, resultObj);

    TicketAgg agg = new TicketAgg();
    agg.IdCard = idCard;
    agg.Count = esResult.aggregations.countByZjhm.buckets[0].doc_count;

    sw.Stop();
    Console.WriteLine($"統計資料,耗時:{sw.Elapsed.TotalSeconds.ToString("0.000")} 秒");

    return agg;
}

程式碼中esQueryBody和resultObj都是一次性使用的,直接用匿名動態型別,而TicketAgg是需要範例化作為返回值給其它方法使用的,所以定義成靜態型別。
評論區有人問可選條件怎麼寫,程式碼如下:

string strStartTime = DateTime.Now.AddDays(-7).ToString("yyyyMMddHHmmss");
string strEndTime = DateTime.Now.ToString("yyyyMMddHHmmss");
string idCard = "33";

var esQueryBody = new
{
    size = 10000,
    query = new
    {
        @bool = new
        {
            must = new List<dynamic>
            {
                new
                {
                    range = new
                    {
                        travel_time = new
                        {
                            gte = strStartTime,
                            lte = strEndTime,
                            format="yyyyMMddHHmmss"
                        }
                    }
                }
            }
        }
    }
};

if (idCard != null)
{
    [email protected](new
    {
        match_phrase = new
        {
            zjhm = idCard
        }
    });
}

string esPostData = JsonConvert.SerializeObject(esQueryBody);

上述程式碼說明:

  1. must原來是dynamic[],它的長度是不可變的,不方便追加,所以修改成List,就可以動態追加了。
  2. 寫這段程式碼,我沒有百度,沒有找檔案,花了幾分鐘試出來的。優秀的語法可以讓使用者舉一反三。

下面一段程式碼,生產測試資料用的:

public async Task MockXxxData(string indexName, int count, DateTime startDate, DateTime endDate, string[] departures, string[] destinations, dynamic peoples)
{
    int days = (int)endDate.Subtract(startDate).TotalDays;

    List<Task> taskList = new List<Task>();
    for (int i = 0; i < count; i++)
    {
        DateTime date = startDate.AddDays(_rnd.Next(0, days + 1));
        long time = (long)(_rnd.NextDouble() * 3600 * 24);
        var people = peoples[_rnd.Next(0, peoples.Length)];

        var esRequestBody = new
        {
            xxx_type = _rnd.Next(1, 4).ToString(),
            zjlx = "xxx",
            zjhm = people.zjhm,
            xm = people.xm,
            departure = departures[_rnd.Next(0, departures.Length)],
            destination = destinations[_rnd.Next(0, destinations.Length)],
            xxx_date = date.ToString("yyyyMMdd"),
            xxx_time = date.AddSeconds(time).ToString("yyyyMMddHHmmss"),
            xxx_time = date.AddSeconds(time).AddHours(0.5 + _rnd.NextDouble()).ToString("yyyyMMddHHmmss"),
            xxx_time = date.AddSeconds(time).AddDays(-2 + _rnd.NextDouble()).ToString("yyyyMMddHHmmss"),
            xxx = "",
            xxx = ""
        };

        var task = ServiceFactory.Get<EsWriteService>().Write(indexName, esRequestBody);
        taskList.Add(task);
    }
    await Task.WhenAll(taskList);
}

上述程式碼說明:

  1. 程式跑起來生產資料,一般會有幾十個執行緒,也就是請求es的並行量是幾十
  2. 如果你覺得幾十的並行量,還是有點高,可以在呼叫的Write非同步方法中使用Semaphore類限制一下並行量,程式碼如下:
private Semaphore _sem = new Semaphore(20, 20); //限制非同步請求的並行數量

public async Task<bool> Write(string indexName, dynamic esRequestBody)
{
    _sem.WaitOne();
    try
    {
        Stopwatch sw = new Stopwatch();
        sw.Start();

        indexName = $"{indexName}-{DateTime.Now.Year}-{DateTime.Now.Month:00}";
        string esUrl = $"http://{esIPs[_rnd.Next(0, esIPs.Length)]}:24100/{indexName}/doc";

        string esRequestData = JsonConvert.SerializeObject(esRequestBody);
        HttpContent content = new StringContent(esRequestData);
        content.Headers.ContentType = new System.Net.Http.Headers.MediaTypeHeaderValue("application/json");
        HttpClient httpClient = HttpClientFactory.GetClient();
        string strEsResult = await (await httpClient.PostAsync(esUrl, content)).Content.ReadAsStringAsync();
        var resultObj = new
        {
            status = 0
        };
        var esResult = JsonConvert.DeserializeAnonymousType(strEsResult, resultObj);

        sw.Stop();
        _log?.Info($"【寫入ES索引】【{(esResult.status == 0 ? "成功" : "失敗")}】耗時:{sw.Elapsed.TotalSeconds:0.000} 秒,索引名稱:{indexName},請求URL:{esUrl},請求引數:{esRequestData}");
        return esResult.status == 0;
    }
    catch
    {
        throw;
    }
    finally
    {
        _sem.Release();
    }
}

用到的庫

評論區有人問技術棧,這裡列一下主要的庫:

  1. Microsoft.Extensions.DependencyInjection 和 Autofac (依賴注入)
  2. AutoMapper (實體類對映)
  3. Microsoft.Extensions.Http (HttpClient,用於操作ElasticSearch、網路請求)
  4. Quartz (定時任務)
  5. Dapper、Dapper.LiteSql (ORM)
  6. Newtonsoft.Json (Json序列化)
  7. ClickHouse.Client (操作ClickHouse)
  8. Oracle.ManagedDataAccess.Core (操作Oracle)
  9. MySqlConnector (操作MySQL)

我最近寫了哪些工程

  1. 大雜燴指令碼工程,包括查詢clickhouse統計分析輸出Excel、查詢MySQL和Oracle、各種小指令碼工具
  2. Blazor工程,做了一套簡單的增刪改查,精力有限,自己測試用,不用手動改資料庫了
  3. 資料探勘服務,主要是Web API和定時任務
  4. Winform工具,用於測試時建立ES索引、生產模擬資料。為什麼寫這個?因為做資料探勘,不給資料,只能自己造了。

為什麼從這篇部落格看起來這個專案只有我一個人在做?沒團隊?

還有專案經理、產品經理、前端等一共幾個人,專案資金投入少,所以不可能有很多人的。

為什麼沒有使用Python?

我一開始是想使用Python的,但就我用.NET寫的這些東西,如果改用Python,沒個2、3年經驗,寫不順暢。

我用.NET做一個專案,Swagger有了,建立工程時自帶的,當然Python的Swagger也是有的,你可以百度"python 從註釋自動生成 swagger",之前看到過一個不錯的,沒儲存,一時半會就找不到了。
用Blazor做了簡單的設定頁面,測試時不用去手動修改資料庫了
寫了一個Mock工程,生產模擬測試資料,寫入速度可以達到6000條/秒(一條資料請求一次,不是批次寫入),介面如下:

最後

寫此部落格是為了給.NET正名,在巨量資料專案中,.NET大有可為。
我寫程式碼沒有用到什麼特別的技術,看起來很簡單,但也不是隨便學學就能寫,沒個3、5年經驗,很難寫的這麼快。
我寫程式碼也沒有什麼條條框框,可能不規範,但很靈活。
例如,winform程式注入紀錄檔工具類怎麼寫?來不急百度了,就這麼寫吧,一樣每秒6000條的狂寫紀錄檔,還不卡介面:

public partial class Form1 : Form, ILog
{
    ...省略

    public Form1()
    {
        InitializeComponent();

        ...省略

        //注入紀錄檔工具類
        ServiceFactory.Get<IndexCreationService>().InjectLog(this);
        ServiceFactory.Get<EsWriteService>().InjectLog(this);
        ServiceFactory.Get<MockDataService>().InjectLog(this);
    }
}

internal class EsWriteService : ServiceBase
{
    ...省略
    private ILog? _log;
    public void InjectLog(ILog log) => _log = log;

    public async Task<bool> Write(string indexName, dynamic esRequestBody)
    {
        ...省略
        _log?.Info("xxx");
        ...省略
    }
}

就目前這些專案、指令碼、工具而言,感覺這就是我寫的最佳實踐。不知道最佳實踐,程式碼也能寫,容易寫成屎山,要麼寫的服務三天兩頭崩。