Skip to main content
版本:v1.10.0

EventMesh 工作流

业务场景

图中你正在构建一个简单的电商订单管理系统,系统能够接收和调配新的订单,调配流程需要处理所有的订单创建,付款处理以及发货处理。

为了实现高可用和高性能,你可以使用事件驱动架构(EDA)构建微服务应用去处理商店前端,订单管理,支付处理和发货管理。你可以在云上部署整个系统。要处理高并发,你可以利用消息系统缓冲,并扩展多个微服务实例。架构类似于:

Workflow Use Case

当每个微服务都在自己的事件通道上运行时,EventMesh 在执行事件编排方面发挥着至关重要的作用。

我们使用 CNCF Serverless 工作流 来描述此事件工作流编排。

CNCF Serverless 工作流

CNCF Serverless 工作流定义了一个厂商中立、开源和完全社区驱动的生态系统,用于定义和运行针对 Serverless 技术领域的基于 DSL 的工作流。

Serverless 工作流定义了一种领域特定语言(DSL)来描述有状态和无状态的基于工作流的 serverless 函数和微服务编排。

详见 官方 github

EventMesh 工作流

我们利用 Serverless 工作流 DSL 来描述 EventMesh 工作流。根据其规范,工作流由一系列用于描述控制流逻辑的工作流状态组成。目前,我们仅支持与事件相关的工作流状态。请参见 工作流 DSL 设计 中支持的状态。

工作流状态可以包含通用的操作,或在工作流执行期间应调用的服务/函数。这些操作可以引用可复用的函数定义(应如何调用这些函数/服务),还可以引用触发基于事件的服务调用的事件,以及要等待的事件,这些事件表示这种基于事件的服务调用完成。

在 EDA 解决方案中,我们通常使用 AsyncAPI 定义事件驱动的微服务。Serverless 工作流“函数”定义支持使用 AsyncAPI 定义调用语义。有关详细信息,请参见 Using Funtions for AsyncAPI Service

AsyncAPI

AsyncAPI 是一项开源计划,旨在改善事件驱动体系结构(EDA)的当前状态。我们的长期目标是让使用 EDA 和使用 REST API 一样容易。包括从文档到代码生成、发现到事件管理。现在应用于 REST API 的大多数流程也适用于事件驱动/异步 API。

详见 AsyncAPI 官网

工作流示例

在本示例中,我们构建了上面订单管理系统的事件驱动工作流。

首先,我们需要为我们的微服务应用定义 AsyncAPI。

  • 在线商店应用程序
asyncapi: 2.2.0
info:
title: Online Store application
version: '0.1.0'
channels:
store/order:
subscribe:
operationId: newStoreOrder
message:
$ref : '#/components/NewOrder'

  • 订单服务
asyncapi: 2.2.0
info:
title: Order Service
version: '0.1.0'
channels:
order/inbound:
publish:
operationId: sendOrder
message:
$ref : '#/components/Order'
order/outbound:
subscribe:
operationId: processedOrder
message:
$ref : '#/components/Order'
  • 支付服务
asyncapi: 2.2.0
info:
title: Payment Service
version: '0.1.0'
channels:
payment/inbound:
publish:
operationId: sendPayment
message:
$ref : '#/components/OrderPayment'
payment/outbound:
subscribe:
operationId: paymentReceipt
message:
$ref : '#/components/OrderPayment'
  • 物流服务
asyncapi: 2.2.0
info:
title: Shipment Service
version: '0.1.0'
channels:
shipment/inbound:
publish:
operationId: sendShipment
message:
$ref : '#/components/OrderShipment'

接下来,定义描述订单管理业务逻辑的订单工作流。

id: storeorderworkflow
version: '1.0'
specVersion: '0.8'
name: Store Order Management Workflow
states:
- name: Receive New Order Event
type: event
onEvents:
- eventRefs:
- NewOrderEvent
actions:
- eventRef:
triggerEventRef: OrderServiceSendEvent
resultEventRef: OrderServiceResultEvent
- eventRef:
triggerEventRef: PaymentServiceSendEvent
resultEventRef: PaymentServiceResultEvent
transition: Check Payment Status
- name: Check Payment Status
type: switch
dataConditions:
- name: Payment Successfull
condition: "${ .payment.status == 'success' }"
transition: Send Order Shipment
- name: Payment Denied
condition: "${ .payment.status == 'denied' }"
end: true
defaultCondition:
end: true
- name: Send Order Shipment
type: operation
actions:
- eventRef:
triggerEventRef: ShipmentServiceSendEvent
end: true
events:
- name: NewOrderEvent
source: file://onlineStoreApp.yaml#newStoreOrder
type: asyncapi
kind: consumed
- name: OrderServiceSendEvent
source: file://orderService.yaml#sendOrder
type: asyncapi
kind: produced
- name: OrderServiceResultEvent
source: file://orderService.yaml#processedOrder
type: asyncapi
kind: consumed
- name: PaymentServiceSendEvent
source: file://paymentService.yaml#sendPayment
type: asyncapi
kind: produced
- name: PaymentServiceResultEvent
source: file://paymentService.yaml#paymentReceipt
type: asyncapi
kind: consumed
- name: ShipmentServiceSendEvent
source: file://shipmentService.yaml#sendShipment
type: asyncapi
kind: produced

对应的工作流图如下:

Workflow Diagram

EventMesh 工作流引擎

在下面的体系结构图中,EventMesh 目录,EventMesh 工作流引擎 和 EventMesh Runtime 在三个不同的处理器中运行。

Workflow Architecture

运行工作流的步骤如下:

  1. 在环境中部署发布者和订阅者应用程序。 使用 AsyncAPI 描述应用程序 API,生成 asyncAPI yaml。 使用 AsyncAPI 在 EventMesh 目录中注册发布者和订阅者应用程序。

  2. 在 EventMesh 工作流引擎中注册 Serverless 工作流 DSL。

  3. 工作流引擎从 EventMesh 目录查询发布服务器和订阅服务器的需要的工作流 DSL函数

  4. 事件驱动 App 将事件发布到 EventMesh Runtime 触发工作流。EventMesh 工作流引擎发布和订阅事件、编排事件。

EventMesh Catalog 设计

EventMesh 目录存储发布者、订阅者和通道元数据。由以下模块组成:

  • AsyncAPI 解析器

    使用 AsyncAPI 社区提供的 SDK (tool list), 解析并验证 AsyncAPI yaml 输入,并生成 AsyncAPI 定义。

  • 发布者,通道,订阅者模块

    从 AsyncAPI 定义存储发布者、订阅者和通道信息。

EventMesh 工作流引擎设计

工作流引擎由以下模块组成:

  • 工作流解析器

    使用 Serverless Workflow 社区提供的 SDK(SDKs), 解析和验证工作流 DSL 输入,并生成工作流定义。

  • 工作流模块

    管理工作流实例的生命周期,从创建、启动、停止到销毁。

  • 状态模块

    管理工作流状态生命周期。支持与事件相关的状态,and the supported state list below is Work-in-Progress.

    工作流状态描述
    Operation执行 Actions 中定义的 AsyncAPI 函数
    Event检查定义的事件是否匹配,如果匹配,执行定义的 AsyncAPI 函数
    Switch检查事件是否与事件条件匹配,并执行定义的 AsyncAPI 函数
    Parallel并行执行定义的 AsyncAPI 函数
    ForEach迭代输入集合并执行定义的 AsyncAPI 函数
  • 行为模块

    管理函数中的行为。

  • 函数模块

    通过在 EventMesh Runtime 中创建发布者和/或订阅者来管理 AsyncAPI 函数,并管理发布者/订阅者生命周期。

    AsyncAPI 操作EventMesh Runtime
    PublishPublisher
    SubscribeSubscriber
  • 事件模块

    使用工作流 DSL 中定义的规则管理 CloudEvent 数据模型,包括事件过滤器、关联和转换。

  • 重试模块

    管理事件发布到 EventMesh Runtime 的重试逻辑。