浅谈工作单元 在整个 ABP 框架当中的应用

ABP在其内部实现了工作单元模式,统一地进行事务与连接管理。 其核心就是通过 Castle 的 Dynamic Proxy 进行动态代理,在组件注册的时候进行拦截器注入,拦截到实现了 Unit Of Work 特性的方法进行操作,在执行完方法之后就会关闭掉工作单元。

其整体流程大概如下:

  • 首先 UOW 相关接口、拦截器等通过 IocManager 注入到 Ioc 容器当中。
  • 监听 Ioc 注册事件,并为其添加方法拦截器。
  • 在拦截器内部使用 using 包裹数据库操作方法,使其成为一个工作单元。
  • 一旦在方法 procced 执行的时候,产生任何异常触发任何异常都不会执行 Complete 方法,直接抛出终止执行。

UnitOfWorkInterceptors

这是一个 Castle Interceptors 的实现,在 AbpBootStrap 的 Initialze 方法当中被注入到 Ioc 容器。

1
2
3
4
5
6
7
8
private void AddInterceptorRegistrars()
{
    ValidationInterceptorRegistrar.Initialize(IocManager);
    AuditingInterceptorRegistrar.Initialize(IocManager);
    EntityHistoryInterceptorRegistrar.Initialize(IocManager);
    UnitOfWorkRegistrar.Initialize(IocManager);
    AuthorizationInterceptorRegistrar.Initialize(IocManager);
}

之后在 Registrar 内部针对组件注入事件进行绑定:

1
2
3
4
5
6
7
8
9
public static void Initialize(IIocManager iocManager)
{
    iocManager.IocContainer.Kernel.ComponentRegistered += (key, handler) =>
    {
        var implementationType = handler.ComponentModel.Implementation.GetTypeInfo();
        HandleTypesWithUnitOfWorkAttribute(implementationType, handler);
        HandleConventionalUnitOfWorkTypes(iocManager, implementationType, handler);
    };
}

在这里的两个方法分别是针对已经实现了 UnitOfWork 特性的类进行绑定,另外一个则是针对符合命名规则的类型进行拦截器绑定。 拦截器做的事情十分简单,针对拦截到的方法进行 UOW 特性检测,如果检测通过之后则直接执行工作单元方法,并根据特性生成 Options。不过不是一个 UOW 的话,则直接继续执行原先方法内代码。

1
2
3
4
5
6
7
8
var unitOfWorkAttr = _unitOfWorkOptions.GetUnitOfWorkAttributeOrNull(method);
if (unitOfWorkAttr == null || unitOfWorkAttr.IsDisabled)
{
    //No need to a uow
    invocation.Proceed();
    return;
}
PerformUow(invocation, unitOfWorkAttr.CreateOptions());

在创建 UOW 的时候,拦截器也会根据方法类型的不同来用不同的方式构建 UOW 。 如果是同步方法的话:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
private void PerformSyncUow(IInvocation invocation, UnitOfWorkOptions options)
{
    // 直接从 Manager 生成一个新的工作单元
    using (var uow = _unitOfWorkManager.Begin(options))
    {
        // 继续执行原方法
        // 产生异常直接进入 uow 的 Dispose 方法
        invocation.Proceed();
        // 如果一切正常,提交事务
        uow.Complete();
    }
}

但如果这个工作单元被标注在异步方法上面,则操作略微复杂:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
private void PerformAsyncUow(IInvocation invocation, UnitOfWorkOptions options)
{
    // 获得一个工作单元
    var uow = _unitOfWorkManager.Begin(options);
    // 继续执行拦截到的方法
    try
    {
        invocation.Proceed();
    }
    catch
    {
        uow.Dispose();
        throw;
    }
    // 如果是异步无返回值的方法
    if (invocation.Method.ReturnType == typeof(Task))
    {
        invocation.ReturnValue = InternalAsyncHelper.AwaitTaskWithPostActionAndFinally(
            (Task) invocation.ReturnValue,
            async () => await uow.CompleteAsync(),
            exception => uow.Dispose()
        );
    }
    else //Task<TResult>
    {
        invocation.ReturnValue = InternalAsyncHelper.CallAwaitTaskWithPostActionAndFinallyAndGetResult(
            invocation.Method.ReturnType.GenericTypeArguments[0],
            invocation.ReturnValue,
            async () => await uow.CompleteAsync(),
            exception => uow.Dispose()
        );
    }
}

以上代码在进入的时候直接执行原方法,如果产生任何异常直接进入 Dispose 方法并且抛出异常。乍一看与同步方法处理没什么区别,但重点是这里并没有执行 Complete 方法,因为这里需要对其异步返回结果更改为 UOW 异步提交之后的值,先查看第一种直接返回 Task 的情况.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
public static async Task AwaitTaskWithPostActionAndFinally(Task actualReturnValue, Func<Task> postAction, Action<Exception> finalAction)
{
    Exception exception = null;
    try
    {
        // 原有异步任务返回值
        await actualReturnValue;
        // 新的异步返回结果
        await postAction();
    }
    catch (Exception ex)
    {
        exception = ex;
        throw;
    }
    finally
    {
        finalAction(exception);
    }
}

在内部首先等待原有任务执行完成之后再执行传入的 UOW 的 CompleteAsync() 方法,并且在执行过程中有无异常都会直接调用 UOW 的 Disopse 释放资源。 第二种则适用于有返回值的情况:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public static object CallAwaitTaskWithPostActionAndFinallyAndGetResult(Type taskReturnType, object actualReturnValue, Func<Task> action, Action<Exception> finalAction)
{
    // 获得 AwaitTaskWithPreActionAndPostActionAndFinallyAndGetResult 方法,并且通过反射构造一个泛型方法,并且将自身参数传入调用。
    return typeof (InternalAsyncHelper)
        .GetMethod("AwaitTaskWithPostActionAndFinallyAndGetResult", BindingFlags.Public | BindingFlags.Static)
        .MakeGenericMethod(taskReturnType)
        .Invoke(null, new object[] { actualReturnValue, action, finalAction });
}
public static async Task<T> AwaitTaskWithPreActionAndPostActionAndFinallyAndGetResult<T>(Func<Task<T>> actualReturnValue, Func<Task> preAction = null, Func<Task> postAction = null, Action<Exception> finalAction = null)
{
    Exception exception = null;
    try
    {
        if (preAction != null)
        {
            await preAction();
        }
        var result = await actualReturnValue();
        if (postAction != null)
        {
            await postAction();                    
        }
        return result;
    }
    catch (Exception ex)
    {
        exception = ex;
        throw;
    }
    finally
    {
        if (finalAction != null)
        {
            finalAction(exception);
        }
    }
}

这两个方法的作用都是确保 CompleteAsync 和 Dispose 能够在放在异步任务当中执行。

IUnitOfWorkManager

顾名思义,这是一个 UOW 的管理器,在 ABP 内部有其一个默认实现 UnitOfWorkManager,在 AbpKernel 模块初始化的时候就已经被注入。 核心方法是 Begin 方法,在 Begin 方法当中通过 FillDefaultsForNonProvidedOptions 方法判断是否传入了配置参数,如果没有传入的话则构建一个默认参数对象。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public IUnitOfWorkCompleteHandle Begin(UnitOfWorkOptions options)
{
    // 如果没有 UOW 参数,构造默认参数
    options.FillDefaultsForNonProvidedOptions(_defaultOptions);
    var outerUow = _currentUnitOfWorkProvider.Current;
    // 当前是否已经存在工作单元,存在工作单元的话,构建一个内部工作单元
    if (options.Scope == TransactionScopeOption.Required && outerUow != null)
    {
        return new InnerUnitOfWorkCompleteHandle();
    }
    // 不存在的话构建一个新的工作单元
    var uow = _iocResolver.Resolve<IUnitOfWork>();
    // 绑定各种事件
    uow.Completed += (sender, args) =>
    {
        _currentUnitOfWorkProvider.Current = null;
    };
    uow.Failed += (sender, args) =>
    {
        _currentUnitOfWorkProvider.Current = null;
    };
    uow.Disposed += (sender, args) =>
    {
        _iocResolver.Release(uow);
    };
    //Inherit filters from outer UOW
    if (outerUow != null)
    {
        options.FillOuterUowFiltersForNonProvidedOptions(outerUow.Filters.ToList());
    }
    uow.Begin(options);
    //Inherit tenant from outer UOW
    if (outerUow != null)
    {
        uow.SetTenantId(outerUow.GetTenantId(), false);
    }
    // 设置当前工作单元为新创建的 UOW 
    _currentUnitOfWorkProvider.Current = uow;
    return uow;
}

这里 Begin 方法所返回的是一个 IUnitOfWorkCompleteHandle 对象,跳转到 IUnitOfWorkCompleteHandle,可以看到它有两个方法:

1
2
3
4
5
public interface IUnitOfWorkCompleteHandle : IDisposable
{
    void Complete();
    Task CompleteAsync();
}

都是完成工作单元的方法,一个同步、一个异步,同时这个接口也实现了 IDispose 接口,从开始使用 using 就可以看出其含义。

InnerUnitOfWorkCompleteHandle

参考其引用,可以发现有一个 IUnitOfWork 接口继承自它,并且还有一个 InnerUnitOfWorkCompleteHandle 的内部实现。这里先查看 InnerUnitOfWorkCompleteHandle 内部实现:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
internal class InnerUnitOfWorkCompleteHandle : IUnitOfWorkCompleteHandle
{
    public const string DidNotCallCompleteMethodExceptionMessage = "Did not call Complete method of a unit of work.";
    private volatile bool _isCompleteCalled;
    private volatile bool _isDisposed;
    public void Complete()
    {
        _isCompleteCalled = true;
    }
    public Task CompleteAsync()
    {
        _isCompleteCalled = true;
        return Task.FromResult(0);
    }
    public void Dispose()
    {
        if (_isDisposed)
        {
            return;
        }
        _isDisposed = true;
        if (!_isCompleteCalled)
        {
            if (HasException())
            {
                return;
            }
            throw new AbpException(DidNotCallCompleteMethodExceptionMessage);
        }
    }
    private static bool HasException()
    {
        try
        {
            return Marshal.GetExceptionCode() != 0;
        }
        catch (Exception)
        {
            return false;
        }
    }
}

其内部实现十分简单,其中 Complete 的同步和异步方法都只是对完成标识进行一个标记。并未真正的进行任何数据库事务操作。同时在他的内部也实现了 IDispose 接口,如果 complete 未被标记为已完成,那么直接抛出异常,后续操作不会执行。 现在再转到 Begin 方法内部就可以发现,在创建的时候会首先判断了当前是否已经存在了工作单元,如果存在了才会创建这样一个内部工作单元。也就是说真正那个的事务操作是在返回的 IUnitOfWork 当中实现的。 这样的话就会构建出一个嵌套的工作单元:

1
OuterUOW->InnerUow1->InnerUOW2->.....->InnerUowN

你可以想象有以下代码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
public void TestUowMethod()
{
    using(var outerUOW = Manager.Begin())  // 这里返回的是 IOC 解析出的 IUnitOfWork
    {
        OperationOuter();
        using(var innerUOW1 = Manager.Begin())  // 内部 UOW
        {
            Operation1();
            using(var innerUOW2 = Manager.Begin())  // 内部 UOW
            {
                Operation2();
                Complete();
            }
            Complete();
        }
        Complete();
    }
}

当代码执行的时候,如同俄罗斯套娃,从内部依次到外部执行,内部工作单元仅会在调用 Complete 方法的时候将 completed 标记为 true,但一旦操作抛出异常,Complete无法得到执行,则会直接抛出异常,中断外层代码执行。 那么 IUnitOfWork 的实现又是怎样的呢? 在 ABP 内部针对 EF Core 框架实现了一套 UOW,其继承自 UnitOfWorkBase,而在 UnitOfWorkBase 内部有部分针对接口 IActiveUnitOfWork 的实现,同时由于 IUnifOfWork 也实现了 IUnitOfWorkCompleteHandle 接口,所以在 Begin 方法处能够向上转型。

IActiveUnitOfWork

在 IActiveUnitOfWork 接口当中定义了工作单元的三个事件,用户在使用的时候可以针对这三个事件进行绑定。

事件名称 触发条件
Completed 工作单元调用 Complete 方法之后触发
Failed 工作单元在调用 Complete 方法如果产生异常,则会在 Dispose 释放资源时触发。
Disposed 释放工作单元的时候触发

除了三个事件之外,在 ABP 当中的数据过滤器也是在工作单元调用的时候工作的,后面讲解 EfCoreUnitOfWork 的时候会研究,数据过滤仅包括软删除等审计字段,同时也包括多租户的租户 ID 也在工作单元当中进行设置的。 在这里也定义了两个 SaveChanges 方法,一个同步、一个异步,作用跟实体上下文的同名方法作用差不多。

IUnitOfWork

IUnitOfWork 同时继承了 IUnitOfWorkCompleteHandle 与 IActiveUnitOfWork 接口,并且增加了两个属性与一个 Begin 方法。

UnitOfWorkBase

UnitOfWorkBase 是所有工作单元的一个抽象基类,在其内部实现了 IUnitOfWork 的所有方法,其中也包括两个父级接口。 下面就一个一个属性讲解。

Id

这个 Id 是在构造函数当中初始化的,全局唯一的 Id,使用 Guid 初始化其值。

Outer

用于标明当前工作单元的外部工作单元,其值是在 UnitOfWorkManager 创建工作单元的时候赋予的。在 Manager 的 Begin 方法当中每次针对 Current Uow 赋值的时候都会将已经存在的 UOW 关联最新的 Current Uow 的 Outer 属性上面。形成如下结构:

1
Current Uow = Uow3.Outer = Uow2.Outer = Uow1.Outer = null

具体代码参考 ICurrentUnitOfWorkProvider 的实现。

Begin()

在 Manager 创建好一个 Uow 之后,就会调用它的 Begin 方法,在内部首先做了一个判断,判断是否多次调用了 Begin 方法,这里可以看到并未做加锁处理。这是因为在 ABP 当中,一个线程当中共用一个工作单元。其实在 CurrentProvider 的代码当中就可以看出来如果在一个线程里面创建了多个工作单元,他会将其串联起来。 之后设置过滤器,并且开始调用 BeginUow 方法,这里并未实现,具体实现我们转到 EfUnitOfWork 可以看到。

BeginUow()-EfCoreUnitOfWork

1
2
3
4
5
6
7
protected override void BeginUow()
{
    if (Options.IsTransactional == true)
    {
        _transactionStrategy.InitOptions(Options);
    }
}

覆写了父类方法,仅对设置进行初始化。 其实到这里我们就大概清楚了 ABP 整个 UOW 的工作流程,如果是两个打上 UnitOfWork 特性的方法在 A 调用 B 的时候其实就会封装成两个嵌套的 using 块。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
using(var a = Manager.Begin())
{
    // 操作
    using(var b = Manager.Begin())
    {
        // 操作
        b.Complete();
    }
    a.Complete();
}

而最外层的 Complete 就是真正执行了数据库事务操作的。 可以看下 EfUnitOfWork 的实现:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// 这里是 UnitOfWorkBase 的 Complete 方法
public void Complete()
{
    PreventMultipleComplete();
    try
    {
        CompleteUow();
        _succeed = true;
        OnCompleted();
    }
    catch (Exception ex)
    {
        _exception = ex;
        throw;
    }
}
// 这里是 EfUnitOfWork 的 CompleteUow 方法
protected override void CompleteUow()
{
    SaveChanges();
    CommitTransaction();
}
// 遍历所有激活的 DbContext 并保存更改到数据库
public override void SaveChanges()
{
    foreach (var dbContext in GetAllActiveDbContexts())
    {
        SaveChangesInDbContext(dbContext);
    }
}
// 提交事务
private void CommitTransaction()
{
    if (Options.IsTransactional == true)
    {
        _transactionStrategy.Commit();
    }
}
Built with Hugo
主题 StackJimmy 设计