大家好,我是Edison。
最近工作中需要用到MongoDB的事務操作,因此參考了一些資料封裝了一個小的元件,提供基礎的CRUD Repository基礎類別 和 UnitOfWork工作單元模式。今天,就來簡單介紹一下這個小元件。
MongoDB在4.2版本開始全面支援了多檔案事務,至今已過了四年了,雖然我們可能沒有在專案中用MongoDB來替代傳統關係型資料庫如MySQL/SQL Server,但是不能否認MongoDB已經在事務能力上愈發成熟了。
在MongoDB中,所謂的事務主要指的是多個檔案的事務,其使用方式和傳統關係型資料庫差不多。但我們需要注意的是:多檔案事務只能應用在副本集 或 mongos 節點上。如果你只是一個單點的mongo範例,是無法進行多檔案事務實踐的。
畫外音:如果你對MongoDB感興趣,不妨看看我的這個系列部落格:《MongoDB入門到實踐學習之旅》
那麼,如何快速進行事務操作呢?
下面演示瞭如何通過Mongo Shell來進行一個多檔案操作的事務提交:
var session = db.getMongo().startSession(); session.startTransaction({readConcern: { level: 'majority' },writeConcern: { w: 'majority' }}); var coll1 = session.getDatabase('students').getCollection('teams'); coll1.update({name: 'yzw-football-team'}, {$set: {members: 20}}); var coll2 = session.getDatabase('students').getCollection('records'); coll1.update({name: 'Edison'}, {$set: {gender: 'Female'}}); // 成功提交事務 session.commitTransaction(); // 失敗事務回滾 session.abortTransaction();
下面展示了在.NET應用中通過MongoDB Driver來進行事務的範例:
using (var clientSession = mongoClient.StartSession()) { try { var contacts = clientSession.Client.GetDatabase("testDB").GetCollection<Contact>("contacts"); contacts.ReplaceOne(contact => contact.Id == "1234455", contact); var books = clientSession.Client.GetDatabase("testDB").GetCollection<Book>("books"); books.DeleteOne(book => book.Id == "1234455"); clientSession.CommitTransaction(); } catch (Exception ex) { // to do some logging clientSession.AbortTransaction(); } }
在大部分的實際應用中,我們通常都習慣使用資料倉儲(Repository)的模式來進行CRUD,同時也習慣用工作單元(UnitOfWork)模式來進行協調多個Repository進行事務提交。那麼,如何在自己的專案中實現這個呢?
參考了一些資料後,自己實現了一個基礎小元件,暫且叫它:EDT.MongoProxy吧,我們來看看它是如何實現的。
基於MongoDB的最佳時間,對於MongoClient最好設定為單例注入,因為在MongoDB.Driver中MongoClient已經被設計為執行緒安全可以被多執行緒共用,這樣可還以避免反覆範例化MongoClient帶來的開銷,避免在極端情況下的效能低下。
這裡暫且設計一個MongoDbConnection類,用於包裹這個MongoClient,然後將其以單例模式注入IoC容器中。
public class MongoDbConnection : IMongoDbConnection { public IMongoClient DatabaseClient { get; } public string DatabaseName { get; } public MongoDbConnection(MongoDatabaseConfigs configs, IConfiguration configuration) { DatabaseClient = new MongoClient(configs.GetMongoClientSettings(configuration)); DatabaseName = configs.DatabaseName; } }
其中,這個MongoDatabaseConfigs類主要是獲取appsettings中的設定,用以生成MongoClient的對應Settings。
/** Config Example "MongoDatabaseConfigs": { "Servers": "xxx01.edisontalk.net,xxx02.edisontalk.net,xxx03.edisontalk.net", "Port": 27017, "ReplicaSetName": "edt-replica", "DatabaseName": "EDT_Practices", "AuthDatabaseName": "admin", "ApplicationName": "Todo", "UserName": "service_testdev", "Password": "xxxxxxxxxxxxxxxxxxxxxxxx", "UseTLS": true, "AllowInsecureTLS": true, "SslCertificatePath": "/etc/pki/tls/certs/EDT_CA.cer", "UseEncryption": true } **/ public class MongoDatabaseConfigs { private const string DEFAULT_AUTH_DB = "admin"; // Default AuthDB: admin public string Servers { get; set; } public int Port { get; set; } = 27017; // Default Port: 27017 public string ReplicaSetName { get; set; } public string DatabaseName { get; set; } public string DefaultCollectionName { get; set; } = string.Empty; public string ApplicationName { get; set; } public string UserName { get; set; } public string Password { get; set; } public string AuthDatabaseName { get; set; } = DEFAULT_AUTH_DB; // Default AuthDB: admin public string CustomProperties { get; set; } = string.Empty; public bool UseTLS { get; set; } = false; public bool AllowInsecureTLS { get; set; } = true; public string SslCertificatePath { get; set; } = string.Empty; public bool StoreCertificateInKeyStore { get; set; } = false; public MongoClientSettings GetMongoClientSettings(IConfiguration configuration = null) { if (string.IsNullOrWhiteSpace(Servers)) throw new ArgumentNullException("Mongo Servers Configuration is Missing!"); if (string.IsNullOrWhiteSpace(UserName) || string.IsNullOrWhiteSpace(Password)) throw new ArgumentNullException("Mongo Account Configuration is Missing!"); // Base Configuration MongoClientSettings settings = new MongoClientSettings { ApplicationName = ApplicationName, ReplicaSetName = ReplicaSetName }; // Credential settings.Credential = MongoCredential.CreateCredential(AuthDatabaseName, UserName, Password); // Servers var mongoServers = Servers.Split(",", StringSplitOptions.RemoveEmptyEntries).ToList(); if (mongoServers.Count == 1) // Standalone { settings.Server = new MongoServerAddress(mongoServers.First(), Port); settings.DirectConnection = true; } if (mongoServers.Count > 1) // Cluster { var mongoServerAddresses = new List<MongoServerAddress>(); foreach (var mongoServer in mongoServers) { var mongoServerAddress = new MongoServerAddress(mongoServer, Port); mongoServerAddresses.Add(mongoServerAddress); } settings.Servers = mongoServerAddresses; settings.DirectConnection = false; } // SSL if (UseTLS) { settings.UseTls = true; settings.AllowInsecureTls = AllowInsecureTLS; if (string.IsNullOrWhiteSpace(SslCertificatePath)) throw new ArgumentNullException("SslCertificatePath is Missing!"); if (StoreCertificateInKeyStore) { var localTrustStore = new X509Store(StoreName.Root); var certificateCollection = new X509Certificate2Collection(); certificateCollection.Import(SslCertificatePath); try { localTrustStore.Open(OpenFlags.ReadWrite); localTrustStore.AddRange(certificateCollection); } catch (Exception ex) { throw; } finally { localTrustStore.Close(); } } var certs = new List<X509Certificate> { new X509Certificate2(SslCertificatePath) }; settings.SslSettings = new SslSettings(); settings.SslSettings.ClientCertificates = certs; settings.SslSettings.EnabledSslProtocols = System.Security.Authentication.SslProtocols.Tls13; } return settings; } }
這裡我們主要仿照DbContext的設計,設計一個MongoDbContext,它從IoC容器中獲取到單例的MongoClient,封裝了事務的開啟和提交,簡化了應用程式碼的編寫。
public class MongoDbContext : IMongoDbContext { private readonly IMongoDatabase _database; private readonly IMongoClient _mongoClient; private readonly IList<Func<IClientSessionHandle, Task>> _commands = new List<Func<IClientSessionHandle, Task>>(); public MongoDbContext(IMongoDbConnection dbClient) { _mongoClient = dbClient.DatabaseClient; _database = _mongoClient.GetDatabase(dbClient.DatabaseName); } public void AddCommand(Func<IClientSessionHandle, Task> func) { _commands.Add(func); } public async Task AddCommandAsync(Func<IClientSessionHandle, Task> func) { _commands.Add(func); await Task.CompletedTask; } /// <summary> /// NOTES: Only works in Cluster mode /// </summary> public int Commit(IClientSessionHandle session) { try { session.StartTransaction(); foreach (var command in _commands) { command(session); } session.CommitTransaction(); return _commands.Count; } catch (Exception ex) { session.AbortTransaction(); return 0; } finally { _commands.Clear(); } } /// <summary> /// NOTES: Only works in Cluster mode /// </summary> public async Task<int> CommitAsync(IClientSessionHandle session) { try { session.StartTransaction(); foreach (var command in _commands) { await command(session); } await session.CommitTransactionAsync(); return _commands.Count; } catch (Exception ex) { await session.AbortTransactionAsync(); return 0; } finally { _commands.Clear(); } } public IClientSessionHandle StartSession() { var session = _mongoClient.StartSession(); return session; } public async Task<IClientSessionHandle> StartSessionAsync() { var session = await _mongoClient.StartSessionAsync(); return session; } public IMongoCollection<T> GetCollection<T>(string name) { return _database.GetCollection<T>(name); } public void Dispose() { GC.SuppressFinalize(this); } }
在實際專案中,我們都希望有一個基礎的RepositoryBase類,將CRUD的方法都封裝了,我們實際中就只需要建立一個對應的Repository整合這個RepositoryBase就行了,無需再重複編寫CRUD的方法。那麼,也就有了這個MongoRepositoryBase類:
public class MongoRepositoryBase<TEntity> : IMongoRepositoryBase<TEntity> where TEntity : MongoEntityBase, new() { protected readonly IMongoDbContext _dbContext; protected readonly IMongoCollection<TEntity> _dbSet; private readonly string _collectionName; private const string _keyField = "_id"; public MongoRepositoryBase(IMongoDbContext mongoDbContext) { _dbContext = mongoDbContext; _collectionName = typeof(TEntity).GetAttributeValue((TableAttribute m) => m.Name) ?? typeof(TEntity).Name; if (string.IsNullOrWhiteSpace(_collectionName)) throw new ArgumentNullException("Mongo DatabaseName can't be NULL! Please set the attribute Table in your entity class."); _dbSet = mongoDbContext.GetCollection<TEntity>(_collectionName); } #region Create Part public async Task AddAsync(TEntity entity, IClientSessionHandle session = null) { if (session == null) await _dbSet.InsertOneAsync(entity); else await _dbContext.AddCommandAsync(async (session) => await _dbSet.InsertOneAsync(entity)); } public async Task AddManyAsync(IEnumerable<TEntity> entityList, IClientSessionHandle session = null) { if (session == null) await _dbSet.InsertManyAsync(entityList); else await _dbContext.AddCommandAsync(async (session) => await _dbSet.InsertManyAsync(entityList)); } #endregion # region Delete Part public async Task DeleteAsync(string id, IClientSessionHandle session = null) { if (session == null) await _dbSet.DeleteOneAsync(Builders<TEntity>.Filter.Eq(_keyField, new ObjectId(id))); else await _dbContext.AddCommandAsync(async (session) => await _dbSet.DeleteOneAsync(Builders<TEntity>.Filter.Eq(_keyField, new ObjectId(id)))); } public async Task DeleteAsync(Expression<Func<TEntity, bool>> expression, IClientSessionHandle session = null) { if (session == null) await _dbSet.DeleteOneAsync(expression); else await _dbContext.AddCommandAsync(async (session) => await _dbSet.DeleteOneAsync(expression)); } public async Task<DeleteResult> DeleteManyAsync(FilterDefinition<TEntity> filter, IClientSessionHandle session = null) { if (session == null) return await _dbSet.DeleteManyAsync(filter); await _dbContext.AddCommandAsync(async (session) => await _dbSet.DeleteManyAsync(filter)); return new DeleteResult.Acknowledged(10); } public async Task<DeleteResult> DeleteManyAsync(Expression<Func<TEntity, bool>> expression, IClientSessionHandle session = null) { if (session == null) return await _dbSet.DeleteManyAsync(expression); await _dbContext.AddCommandAsync(async (session) => await _dbSet.DeleteManyAsync(expression)); return new DeleteResult.Acknowledged(10); } #endregion #region Update Part public async Task UpdateAsync(TEntity entity, IClientSessionHandle session = null) { if (session == null) await _dbSet.ReplaceOneAsync(item => item.Id == entity.Id, entity); else await _dbContext.AddCommandAsync(async (session) => await _dbSet.ReplaceOneAsync(item => item.Id == entity.Id, entity)); } public async Task UpdateAsync(Expression<Func<TEntity, bool>> expression, Expression<Action<TEntity>> entity, IClientSessionHandle session = null) { var fieldList = new List<UpdateDefinition<TEntity>>(); if (entity.Body is MemberInitExpression param) { foreach (var item in param.Bindings) { var propertyName = item.Member.Name; object propertyValue = null; if (item is not MemberAssignment memberAssignment) continue; if (memberAssignment.Expression.NodeType == ExpressionType.Constant) { if (memberAssignment.Expression is ConstantExpression constantExpression) propertyValue = constantExpression.Value; } else { propertyValue = Expression.Lambda(memberAssignment.Expression, null).Compile().DynamicInvoke(); } if (propertyName != _keyField) { fieldList.Add(Builders<TEntity>.Update.Set(propertyName, propertyValue)); } } } if (session == null) await _dbSet.UpdateOneAsync(expression, Builders<TEntity>.Update.Combine(fieldList)); else await _dbContext.AddCommandAsync(async (session) => await _dbSet.UpdateOneAsync(expression, Builders<TEntity>.Update.Combine(fieldList))); } public async Task UpdateAsync(FilterDefinition<TEntity> filter, UpdateDefinition<TEntity> update, IClientSessionHandle session = null) { if (session == null) await _dbSet.UpdateOneAsync(filter, update); else await _dbContext.AddCommandAsync(async (session) => await _dbSet.UpdateOneAsync(filter, update)); } public async Task UpdateManyAsync(Expression<Func<TEntity, bool>> expression, UpdateDefinition<TEntity> update, IClientSessionHandle session = null) { if (session == null) await _dbSet.UpdateManyAsync(expression, update); else await _dbContext.AddCommandAsync(async (session) => await _dbSet.UpdateManyAsync(expression, update)); } public async Task<UpdateResult> UpdateManayAsync(Dictionary<string, string> dic, FilterDefinition<TEntity> filter, IClientSessionHandle session = null) { var t = new TEntity(); // Fields to be updated var list = new List<UpdateDefinition<TEntity>>(); foreach (var item in t.GetType().GetProperties()) { if (!dic.ContainsKey(item.Name)) continue; var value = dic[item.Name]; list.Add(Builders<TEntity>.Update.Set(item.Name, value)); } var updatefilter = Builders<TEntity>.Update.Combine(list); if (session == null) return await _dbSet.UpdateManyAsync(filter, updatefilter); await _dbContext.AddCommandAsync(async (session) => await _dbSet.UpdateManyAsync(filter, updatefilter)); return new UpdateResult.Acknowledged(10, 10, null); } #endregion #region Read Part public async Task<TEntity> GetAsync(Expression<Func<TEntity, bool>> expression, bool readFromPrimary = true) { var readPreference = GetReadPreference(readFromPrimary); var queryData = await _dbSet.WithReadPreference(readPreference) .Find(expression) .FirstOrDefaultAsync(); return queryData; } public async Task<TEntity> GetAsync(string id, bool readFromPrimary = true) { var readPreference = GetReadPreference(readFromPrimary); var queryData = await _dbSet.WithReadPreference(readPreference).FindAsync(Builders<TEntity>.Filter.Eq(_keyField, new ObjectId(id))); return queryData.FirstOrDefault(); } public async Task<IEnumerable<TEntity>> GetAllAsync(bool readFromPrimary = true) { var readPreference = GetReadPreference(readFromPrimary); var queryAllData = await _dbSet.WithReadPreference(readPreference).FindAsync(Builders<TEntity>.Filter.Empty); return queryAllData.ToList(); } public async Task<long> CountAsync(Expression<Func<TEntity, bool>> expression, bool readFromPrimary = true) { var readPreference = GetReadPreference(readFromPrimary); return await _dbSet.WithReadPreference(readPreference).CountDocumentsAsync(expression); } public async Task<long> CountAsync(FilterDefinition<TEntity> filter, bool readFromPrimary = true) { var readPreference = GetReadPreference(readFromPrimary); return await _dbSet.WithReadPreference(readPreference).CountDocumentsAsync(filter); } public async Task<bool> ExistsAsync(Expression<Func<TEntity, bool>> predicate, bool readFromPrimary = true) { var readPreference = GetReadPreference(readFromPrimary); return await Task.FromResult(_dbSet.WithReadPreference(readPreference).AsQueryable().Any(predicate)); } public async Task<List<TEntity>> FindListAsync(FilterDefinition<TEntity> filter, string[]? field = null, SortDefinition<TEntity>? sort = null, bool readFromPrimary = true) { var readPreference = GetReadPreference(readFromPrimary); if (field == null || field.Length == 0) { if (sort == null) return await _dbSet.WithReadPreference(readPreference).Find(filter).ToListAsync(); return await _dbSet.WithReadPreference(readPreference).Find(filter).Sort(sort).ToListAsync(); } var fieldList = new List<ProjectionDefinition<TEntity>>(); for (int i = 0; i < field.Length; i++) { fieldList.Add(Builders<TEntity>.Projection.Include(field[i].ToString())); } var projection = Builders<TEntity>.Projection.Combine(fieldList); fieldList?.Clear(); if (sort == null) return await _dbSet.WithReadPreference(readPreference).Find(filter).Project<TEntity>(projection).ToListAsync(); return await _dbSet.WithReadPreference(readPreference).Find(filter).Sort(sort).Project<TEntity>(projection).ToListAsync(); } public async Task<List<TEntity>> FindListByPageAsync(FilterDefinition<TEntity> filter, int pageIndex, int pageSize, string[]? field = null, SortDefinition<TEntity>? sort = null, bool readFromPrimary = true) { var readPreference = GetReadPreference(readFromPrimary); if (field == null || field.Length == 0) { if (sort == null) return await _dbSet.WithReadPreference(readPreference).Find(filter).Skip((pageIndex - 1) * pageSize).Limit(pageSize).ToListAsync(); return await _dbSet.WithReadPreference(readPreference).Find(filter).Sort(sort).Skip((pageIndex - 1) * pageSize).Limit(pageSize).ToListAsync(); } var fieldList = new List<ProjectionDefinition<TEntity>>(); for (int i = 0; i < field.Length; i++) { fieldList.Add(Builders<TEntity>.Projection.Include(field[i].ToString())); } var projection = Builders<TEntity>.Projection.Combine(fieldList); fieldList?.Clear(); if (sort == null) return await _dbSet.WithReadPreference(readPreference).Find(filter).Project<TEntity>(projection).Skip((pageIndex - 1) * pageSize).Limit(pageSize).ToListAsync(); return await _dbSet.WithReadPreference(readPreference).Find(filter).Sort(sort).Project<TEntity>(projection).Skip((pageIndex - 1) * pageSize).Limit(pageSize).ToListAsync(); } #endregion #region Protected Methods protected ReadPreference GetReadPreference(bool readFromPrimary) { if (readFromPrimary) return ReadPreference.PrimaryPreferred; else return ReadPreference.SecondaryPreferred; } #endregion }
在實際專案中,在對多個Repository操作之後,我們希望有一個統一的提交操作來實現事務的原子性。因此,我們可以有一個UnitOfWork來作為代理:
public class UnitOfWork : IUnitOfWork { private readonly IMongoDbContext _context; public UnitOfWork(IMongoDbContext context) { _context = context; } public bool SaveChanges(IClientSessionHandle session) { return _context.Commit(session) > 0; } public async Task<bool> SaveChangesAsync(IClientSessionHandle session) { return await _context.CommitAsync(session) > 0; } public IClientSessionHandle BeginTransaction() { return _context.StartSession(); } public async Task<IClientSessionHandle> BeginTransactionAsync() { return await _context.StartSessionAsync(); } public void Dispose() { _context.Dispose(); } }
為了便於應用中快速注入,我們可以簡單封裝一個擴充套件方法,快速注入相關的核心組成部分:
public static class ServiceCollectionExtensions { /// <summary> /// MongoDB Config Injection /// </summary> public static IServiceCollection AddMongoProxy(this IServiceCollection services, IConfiguration configuration) { if (!configuration.GetSection(nameof(MongoDatabaseConfigs)).Exists()) return services; services.Configure<MongoDatabaseConfigs>(configuration.GetSection(nameof(MongoDatabaseConfigs))); services.AddSingleton(sp => sp.GetRequiredService<IOptions<MongoDatabaseConfigs>>().Value); services.AddSingleton<IMongoDbConnection, MongoDbConnection>(); services.AddScoped<IMongoDbContext, MongoDbContext>(); services.AddScoped<IUnitOfWork, UnitOfWork>(); return services; } }
在appsettings中設定MongoDB的連線資訊:
"MongoDatabaseConfigs": { "Servers": "xxx01.edisontalk.net,xxx02.edisontalk.net,xxx03.edisontalk.net", "Port": 27017, "ReplicaSetName": "edt-replica", "DatabaseName": "EDT_Practices", "UserName": "xxxxxxxxxxxxx", "Password": "xxxxxxxxxxxxx" }
然後通過擴充套件方法注入MongoProxy相關部分:
builder.Services.AddMongoProxy(builder.Configuration);
範例Entity:
[Table("Orders")] public class OrderEntity : MongoEntityBase, IEntity { public string OrderNumber { get; set; } public List<TransmissionEntity> Transmissions { get; set; } }
範例Repository:
public interface ITodoItemRepository : IMongoRepositoryBase<TodoItem> { } public class TodoItemRepository : MongoRepositoryBase<TodoItem>, ITodoItemRepository { public TodoItemRepository(IMongoDbContext mongoDbContext) : base(mongoDbContext) { } } services.AddScoped<ITodoItemRepository, TodoItemRepository>(); services.AddScoped<IOrderRepository, OrderRepository>(); ......
第三步:使用Repository 和 UnitOfWork
# 非事務模式 await _taskRepository.AddManyAsync(newTasks); # 事務模式(藉助UnitOfWork工作單元) private readonly IUnitOfWork _unitOfWork; public OrderService(IUnitOfWork unitOfWork, ......) { _unitOfWork = unitOfWork; ...... } public async Task Example() { using var session = await _unitOfWork.BeginTransactionAsync()) await _taskRepository.AddManyAsync(newTasks, session); await _orderRepository.AddAsync(newOrder, session); await _unitOfWork.SaveChangesAsync(session); }
本文介紹了MongoDB事務的基本概念和如何通過.NET操作事務,重點介紹了EDT.MongoProxy這個小元件的設計,讓我們可以在ASP.NET 6應用中通過資料倉儲(Repository)和工作單元(UnitOfWork)的模式來快速方便地操作MongoDB的事務。
本文程式碼並未提供所有的,如需檢視,請至下面的程式碼倉庫中檢視,也可以點個贊給點鼓勵。
GitHub:https://github.com/Coder-EdisonZhou/EDT.MongoProxy
追逐時光者,《.NET Core MongoDB資料倉儲和工作單元實操》 *本文主要設計參考自這篇文章,值得一讀!
TheCodeBuzz,《MongoDB Repository Implementation in .NET Core》:
Bryan Avery, 《ASP.NET Core - MongoDB Repository Pattern & Unit Of Work》: