EdgeSync ServiceFramework AspNetCore.Mvc

概覽

EdgeSync ServiceFramework AspNetCore.Mvc 提供了一個強大的框架,用於在 ASP.NET Core 應用程式中自動化 NATS 通訊模式的配置。框架支援兩種主要的通訊模式:Request/ResponsePub/Sub,並可搭配 NATS JetStream 或 Classic NATS 使用。

核心架構

1. 自動模式決策機制

框架使用 ConventionDecisionMaker 根據決策樹自動選擇最適合的通訊模式:

flowchart TD
    A["方法是否有返回值"] --> B["有"]
    A --> C["無"]
    
    B --> B1["檢查 JetStreamPullAttribute"]
    B1 --> B2["有 → 錯誤"]
    B1 --> B3["無 → Request/Response"]
    
    C --> E["參數是否為 Collection"]
    E --> F["是 → Pub/Sub Push"]
    E --> G["否"]
    
    G --> H["檢查 JetStreamPullAttribute"]
    H --> I["有 → Pub/Sub Pull"]
    H --> J["無 → Pub/Sub Push"]

2. 支援的通訊模式

模式說明使用場景
RequestResponse請求-回應模式API 呼叫、資料查詢
PubSubPushJetStreamJetStream Push 模式事件發布、廣播通知
PubSubPullJetStreamJetStream Pull 模式工作佇列、批次處理
PubSubPushClassicClassic NATS Push輕量級事件通知

Request/Response 實作

基本概念

Request/Response 模式適用於需要同步回應的場景,如 API 呼叫、資料查詢等。

自動 Convention 方式

[ApiController]
[Route("api/[controller]")]
public class BooksController : ControllerBase
{
    private readonly IBookAppService _bookService;

    public BooksController(IBookAppService bookService)
    {
        _bookService = bookService;
    }

    // 自動判斷為 RequestResponse 模式(有返回值)
    [Subject("getBook")]
    public async Task<ErrorOr<BookDto>> GetBookAsync(Guid id)
    {
        return await _bookService.GetAsync(id);
    }

    // HTTP GET 方法,Query 參數綁定
    [Subject("searchBooks")]
    public async Task<ErrorOr<List<BookDto>>> SearchBooksAsync(string keyword, int page = 1)
    {
        return await _bookService.SearchAsync(keyword, page);
    }

    // HTTP POST 方法,Body 參數綁定
    [Subject("createBook")]
    public async Task<ErrorOr<BookDto>> CreateBookAsync([FromBody] CreateBookDto request)
    {
        return await _bookService.CreateAsync(request);
    }
}

HTTP 方法自動對應

框架會根據方法名稱自動決定 HTTP 方法:

  • Get*, Find*, Search*GET
  • Create*, Add*POST
  • Update*, Modify*PUT
  • Delete*, Remove*DELETE
  • 其他 → POST(預設)

參數綁定邏輯

  • 路由參數: 如果參數名稱出現在路由模板中,使用 [FromRoute]
  • 簡單類型: int, string, Guid, DateTime 等使用 [FromQuery]
  • 複雜類型: 自定義物件使用 [FromBody]

手動 ServiceHandler 方式

public class BookServiceHandler : ServiceHandler
{
    private readonly IBookAppService _bookAppService;

    public override string ServiceName => "bookstore";
    public override string ServiceVersion => "1.0.0";
    public override string QueueGroup => "bookstore-queue";

    public BookServiceHandler(
        ILogger<BookServiceHandler> logger,
        IJetStreamClientFactory factory,
        IBookAppService bookAppService)
        : base(logger, factory, "bus")
    {
        _bookAppService = bookAppService;
    }

    [Subject("getBook", "bookstore.books.*.get")]
    public async ValueTask GetBookAsync(ServiceMsgContext<string> context, string message)
    {
        try
        {
            var id = context.Subject.Split('.')[2];
            if (!Guid.TryParse(id, out Guid guid))
            {
                var errorResponse = CreateErrorResponse("Invalid request: Id is required");
                await context.ServiceMsg.ReplyAsync(errorResponse);
                return;
            }

            var result = await _bookAppService.GetAsync(guid);
            if (result.IsError)
            {
                var errorResponse = CreateErrorResponse(
                    string.Join(", ", result.Errors.Select(e => e.Description)));
                await context.ServiceMsg.ReplyAsync(errorResponse);
            }
            else
            {
                var successResponse = CreateSuccessResponse(result.Value);
                await context.ServiceMsg.ReplyAsync(successResponse);
            }
        }
        catch (Exception ex)
        {
            Logger.LogError(ex, "Error in GetBookAsync");
            var errorResponse = CreateErrorResponse($"Internal error: {ex.Message}");
            await context.ServiceMsg.ReplyAsync(errorResponse);
        }
    }

    private string CreateSuccessResponse(object data)
    {
        var response = new NatsResponse<object>
        {
            IsSuccess = true,
            Data = data,
            Error = null
        };
        return JsonSerializer.Serialize(response);
    }

    private string CreateErrorResponse(string error)
    {
        var response = new NatsResponse<object>
        {
            IsSuccess = false,
            Data = null,
            Error = error
        };
        return JsonSerializer.Serialize(response);
    }
}

Pub/Sub 實作

基本概念

Pub/Sub 模式適用於事件驅動的架構,支援以下子模式:

  1. Push Mode: 訊息主動推送給訂閱者
  2. Pull Mode: 訂閱者主動拉取訊息
  3. JetStream: 提供持久化、重播、確認機制
  4. Classic: 輕量級、無持久化

Push Mode(事件發布)

自動 Convention 方式

[ApiController]
[Route("api/[controller]")]
public class EventsController : ControllerBase
{
    // 無返回值 → 自動判斷為 PubSub Push 模式
    [Subject("bookRefill")]
    [JetStream(enable: true)] // 明確啟用 JetStream
    public async Task PublishBookRefillAsync([FromBody] BookRefillEvent refillEvent)
    {
        // 發布事件邏輯
        await _eventPublisher.PublishAsync("bookstore.events.book.refill", refillEvent);
    }

    // Collection 參數 → 自動判斷為 PubSub Push 模式
    [Subject("batchProcessBooks")]
    public async Task ProcessBooksAsync([FromBody] List<BookDto> books)
    {
        // 批次處理邏輯
        foreach (var book in books)
        {
            await _bookProcessor.ProcessAsync(book);
        }
    }
}

手動 ServiceHandler 方式

public class BookEventHandler : NatsService
{
    private readonly IBookAppService _bookService;

    public BookEventHandler(ILogger<BookEventHandler> logger, IBookAppService bookService)
        : base(logger)
    {
        _bookService = bookService;
    }

    // Push 模式:接收事件並處理
    [Subject("handleBookRefill", "bookstore.events.book.refill")]
    public async Task HandleBookRefillAsync(BookRefillEvent refillEvent)
    {
        Logger.LogInformation("Received book refill event for BookId: {BookId}", refillEvent.BookId);
        
        var result = await _bookService.RefillStockAsync(refillEvent.BookId, refillEvent.RefillQuantity);
        
        if (result.IsError)
        {
            Logger.LogError("Failed to process refill event: {Errors}", 
                string.Join(", ", result.Errors.Select(e => e.Description)));
        }
    }
}

Pull Mode(工作佇列)

自動 Convention 方式

[ApiController]
[Route("api/[controller]")]
public class WorkersController : ControllerBase
{
    // 使用 JetStreamPullAttribute → 自動判斷為 Pull 模式
    [Subject("processBookOrders")]
    [JetStreamPull(
        consumerName: "book-order-processor",
        consumerGroup: "order-workers",
        maxMessages: 10,
        ackPolicy: "Explicit")]
    public async Task ProcessBookOrdersAsync([FromBody] BookOrderTask task)
    {
        // 處理單個訂單任務
        await _orderProcessor.ProcessAsync(task);
    }

    // Collection + JetStreamPull → 批次拉取處理
    [Subject("processBatchOrders")]
    [JetStreamPull(
        consumerName: "batch-order-processor",
        maxMessages: 50)]
    public async Task ProcessBatchOrdersAsync([FromBody] List<BookOrderTask> tasks)
    {
        // 批次處理訂單
        await Task.WhenAll(tasks.Select(_orderProcessor.ProcessAsync));
    }
}

手動 BaseEventHandler 方式

public class BookOrderProcessor : BaseEventHandler
{
    private readonly IOrderService _orderService;

    public BookOrderProcessor(
        ILogger<BaseEventHandler> logger,
        IJetStreamClientFactory factory,
        IOrderService orderService)
        : base(logger, factory, "broker")
    {
        _orderService = orderService;
    }

    protected override string SubjectName => "bookstore.orders.process";
    protected override string StreamName => "bookstore-orders";
    protected override string ConsumerName => "order-processor";

    // JetStream 配置
    protected override JetStreamConfigOptions JStreamCfgOpts { get; set; } = new()
    {
        MaxMsgs = -1,
        MaxBytes = -1,
        MaxAge = TimeSpan.FromDays(7),
        Description = "Book order processing stream",
        Retention = StreamConfigRetention.Limits,
        Storage = StreamConfigStorage.File,
    };

    // Consumer 配置
    protected override ConsumerConfigOptions ConsumerCfgOpts { get; set; } = new()
    {
        AckPolicy = ConsumerConfigAckPolicy.Explicit,
        ReplayPolicy = ConsumerConfigReplayPolicy.Instant,
        MaxAckPending = 100,
    };

    protected override async Task HandleInputEventCore(byte[] message, string subject)
    {
        try
        {
            var messageJson = Encoding.UTF8.GetString(message);
            var orderTask = JsonSerializer.Deserialize<BookOrderTask>(messageJson);

            if (orderTask == null)
            {
                Logger.LogWarning("Failed to deserialize order task from subject: {Subject}", subject);
                return;
            }

            Logger.LogInformation("Processing order task: {OrderId}", orderTask.OrderId);
            
            var result = await _orderService.ProcessOrderAsync(orderTask);
            
            if (result.IsError)
            {
                Logger.LogError("Failed to process order {OrderId}: {Errors}",
                    orderTask.OrderId, string.Join(", ", result.Errors.Select(e => e.Description)));
                // 可以選擇重新排隊或發送到 DLQ
                return;
            }

            Logger.LogInformation("Successfully processed order: {OrderId}", orderTask.OrderId);
        }
        catch (Exception ex)
        {
            Logger.LogError(ex, "Unexpected error processing order from subject: {Subject}", subject);
        }
    }
}

模式配置與屬性

Subject 屬性

[Subject("endpointName", "custom.subject.pattern")]
public async Task MethodAsync(...)
  • endpointName: API 端點名稱
  • customSubject: 自定義 NATS subject 模式

JetStream 屬性

[JetStream(enable: true)]  // 明確啟用 JetStream
[JetStream(enable: false)] // 明確禁用 JetStream(使用 Classic NATS)

JetStream Pull 屬性

[JetStreamPull(
    consumerName: "my-consumer",
    consumerGroup: "worker-group",
    maxMessages: 10,
    ackPolicy: "Explicit",
    createConsumerIfNotExists: true)]

參數綁定屬性

public async Task MethodAsync(
    [FromRoute] Guid id,           // 路由參數
    [FromQuery] string filter,     // 查詢參數
    [FromBody] CreateDto request)  // 請求本體

配置選項

AutoConventionOptions

services.Configure<AutoConventionOptions>(options =>
{
    options.DefaultJetStreamEnable = true;  // 預設啟用 JetStream
    options.RequestTimeout = TimeSpan.FromSeconds(30);
    options.RetryCount = 3;
    options.ErrorHandlingMode = "Detailed";
});

服務註冊

public void ConfigureServices(IServiceCollection services)
{
    // 註冊 ServiceFramework
    services.AddServiceFramework(options =>
    {
        options.NatsUrl = "nats://localhost:4222";
        options.DefaultJetStreamEnable = true;
    });

    // 註冊 Auto Convention
    services.AddAutoConvention(options =>
    {
        options.EnableAutoRouting = true;
        options.RoutePrefix = "api/nats";
    });

    // 註冊應用服務
    services.AddScoped<IBookAppService, BookAppService>();
    
    // 註冊 ServiceHandler(手動模式)
    services.AddHostedService<BookServiceHandler>();
    services.AddHostedService<BookEventHandler>();
}

背景服務機制

ServiceFrameworkBackgroundService

框架會自動掃描並啟動所有繼承自 NatsService 的服務:

public class ServiceFrameworkBackgroundService : BackgroundService
{
    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        // 1. 掃描所有 NatsService 實作
        var serviceTypes = AppDomain.CurrentDomain.GetAssemblies()
            .SelectMany(assembly => assembly.GetTypes())
            .Where(type => type.IsClass && !type.IsAbstract &&
                           type.IsSubclassOf(typeof(NatsService)))
            .ToList();

        // 2. 為每個服務建立 NATS 連線
        foreach (var serviceType in serviceTypes)
        {
            var channelAttribute = serviceType.GetCustomAttribute<ChannelAttribute>();
            var connection = channelAttribute?.Name != null
                ? scope.ServiceProvider.GetRequiredKeyedService<INatsConnection>(channelAttribute.Name)
                : scope.ServiceProvider.GetRequiredService<INatsConnection>();

            // 3. 註冊方法端點並開始監聽
            var natsMethods = service.GetNatsMethods();
            foreach (var natsMethod in natsMethods)
            {
                await SubscribeToMethod(connection, serviceType, natsMethod, stoppingToken);
            }
        }
    }
}

錯誤處理與回應

ErrorOr 結果處理

public async Task HandleMessage(NatsMethodInfo methodInfo, NatsMsg<string> msg)
{
    var result = methodInfo.Method.Invoke(service, args);
    
    if (result is Task task && task.GetType().IsGenericType)
    {
        var taskResult = task.GetType().GetProperty("Result")?.GetValue(task);
        
        if (IsErrorOrType(taskResult.GetType()))
        {
            var response = CreateNatsResponse(taskResult);
            await msg.ReplyAsync(JsonSerializer.Serialize(response));
        }
    }
}

private object CreateNatsResponse(object errorOrResult)
{
    var isError = (bool)errorOrResult.GetType().GetProperty("IsError")!.GetValue(errorOrResult)!;
    
    if (isError)
    {
        var errors = errorOrResult.GetType().GetProperty("Errors")!.GetValue(errorOrResult) as IEnumerable<Error>;
        return new NatsResponse<object>
        {
            IsSuccess = false,
            Data = null,
            Error = errors?.FirstOrDefault()?.Description ?? "Unknown error"
        };
    }
    else
    {
        var value = errorOrResult.GetType().GetProperty("Value")!.GetValue(errorOrResult);
        return new NatsResponse<object>
        {
            IsSuccess = true,
            Data = value,
            Error = null
        };
    }
}

總結

EdgeSync ServiceFramework 提供了兩種主要的實作方式:

  1. Auto Convention 模式: 適合快速開發,框架自動決定通訊模式和配置
  2. 手動 ServiceHandler 模式: 提供更細緻的控制,適合複雜的業務邏輯

選擇使用哪種方式取決於您的具體需求:

  • 簡單的 CRUD 操作建議使用 Auto Convention
  • 複雜的事件處理和工作流程建議使用手動 ServiceHandler
  • 兩種方式可以在同一個應用程式中混合使用

框架的設計讓開發者可以專注於業務邏輯,而無需擔心底層的 NATS 通訊細節。

Auto-Convention Decision Tree

flowchart TD
    A["(1) ReturnType 有無"] --> B["有"]
    A --> C["無"]
    
    B --> B1["檢查是否有 JetStreamPullAttribute"]
    B1 --> B2["有"]
    B1 --> B3["無"]
    B2 --> ERROR1["❌ 錯誤: Request/Response Mode<br/>不可使用 JetStreamPullAttribute"]
    B3 --> D["(2) Request/Response Mode"]
    
    C --> E["(3) parameter 是否為 Collection"]
    E --> F["是"]
    E --> G["否"]
    
    F --> H["(4) 是否有使用 JetStreamPullAttribute"]
    H --> I["是"]
    H --> J["否"]
    
    G --> K["(5) 是否有使用 JetStreamPullAttribute"]
    K --> L["是"]
    K --> M["否"]
    
    I --> I1["檢查是否有 JetStreamSubject"]
    I1 --> I2["有"]
    I1 --> I3["無"]
    I2 --> ERROR2["❌ 錯誤: Collection + JetStreamPullAttribute<br/>不可同時使用 JetStreamSubject"]
    I3 --> N["(14) Pub/Sub Pull Mode with JetStream"]
    
    J --> O["(6) 是否有使用 JetStreamSubject"]
    O --> P["是"]
    O --> Q["否"]
    
    L --> L1["檢查是否有 JetStreamSubject"]
    L1 --> L2["有"]
    L1 --> L3["無"]
    L2 --> ERROR3["❌ 錯誤: Non-Collection + JetStreamPullAttribute<br/>不可同時使用 JetStreamSubject"]
    L3 --> R["(15) Pub/Sub Pull Mode with JetStream"]
    
    M --> S["(7) 是否有使用 JetStreamSubject"]
    S --> T["是"]
    S --> U["否"]
    
    P --> V["(8) Pub/Sub Push Mode with JetStream"]
    
    Q --> W["(9) 看 AutoConventionOption.DefaultStreamEnabled"]
    W --> X["是"]
    W --> Y["否"]
    
    T --> Z["(10) Pub/Sub Pull Mode with JetStream"]
    
    U --> AA["(11) 看 AutoConventionOption.DefaultStreamEnabled"]
    AA --> BB["是"]
    AA --> CC["否"]
    
    X --> DD["(12) Pub/Sub Push Mode with JetStream"]
    Y --> EE["(13) Pub/Sub Push Mode classic"]
    BB --> FF["(16) Pub/Sub Pull Mode with JetStream"]
    CC --> ERROR4["❌ 錯誤: Pull Mode 需要 JetStream<br/>無法在 non-js 環境下使用"]
    
    classDef errorStyle fill:#ffcccc,stroke:#ff0000,stroke-width:2px
    class ERROR1,ERROR2,ERROR3,ERROR4 errorStyle