CloudEvents 集成
介绍
CloudEvents 是一种描述事件数据的格式规范,它提供了跨服务、平台与系统的互操作性。
截止至 2021 年 5 月,EventMesh 包含了以下主要组件:eventmesh-runtime, eventmesh-sdk-java 和 eventmesh-connector-rocketmq。
对于使用 EventMesh 的用户,eventmesh-runtime 可以被部署为微服务来在生产者和消费者间传输用户的事件。
用户的应用程序可以通过 eventmesh-sdk-java 来与 eventmesh-runtime 进行交互,即发布或订阅指定主题的事件。
EventMesh 的用户非常渴望能得到对 CloudEvents 的支持。有许多理由使得用户倾向于使用集成了 CloudEvents 支持的 SDK:
- CloudEvents 是一种更为广泛接受和支持的描述事件的方式。目前,
eventmesh-sdk-java使用的是LiteMessage结构 来描述事件,其标准化程度较低。 - CloudEvents 的 Java SDK 有更广泛的分发方式。比如,目前 EventMesh 的用户需要使用 SDK 的 tar 包,或对每个 EventMesh 的 发布版本从源码编译。有了 CloudEvents 的支持,用户可以更方便地通过 CloudEvents 的公开分发(比如,配置 Maven)来添加 EventMesh SDK 依赖项。
- CloudEvents 的 SDK 支持多种语言。尽管目前 EventMesh 只提供了 Java SDK,但在未来,如果要为更多语言提供支持,将 Java SDK 与 CloudEvents 绑定的经验将使工作变得容易。
需求
功能需求
| 需求 ID | 需求描述 | 备注 |
|---|---|---|
| F-1 | EventMesh 用户应能使用公共 SDK 依赖项来发布或订阅 CloudEvents 格式的事件 | 功能性 |
| F-2 | EventMesh 用户应能在提供了 CloudEvents 支持的 SDK 中继续使用现有的 EventMesh 客户端功能(如负载均衡) | 功能等价 |
| F-3 | EventMesh 的开发者应不需要付出特别多努力/痛苦来在 eventmesh-sdk-java 和提供了 CloudEvents 支持的 SDK 之间同步 | 可维护性 |
| F-4 | EventMesh 支持可插拔的协议,以便开发者整合其他协议(例如:CloudEvents / EventMesh MessageOpenMessage / MQTT...) | 功能性 |
| F-5 | EventMesh 支持统一的 API 以供从/向事件库发布或订阅事件 | 功能性 |
性能需求
| 需求 ID | 需求描述 | 备注 |
|---|---|---|
| P-1 | 提供了 CloudEvents 支持的 SDK 应具有与目前的 SDK 相近的客户端延迟 |
设计细节
与 CloudEvents 的 Java SDK 绑定(这与 Kafka 已经完成的工作类似,请在附录中的参考资料了解更多细节)是达成上述需求的一种简单方法。
可插拔协议

EventMesh 集成 CloudEvents 进度表
TCP
SDK 端发布
- 在
package首部中添加 CloudEvents 标识符 - 使用
CloudEventBuilder构造 CloudEvent,并将其放入package体中
SDK 端订阅
- 在
ReceiveMsgHook接口下添加convert函数,其用于将package体转换为具有package首部标识符的特定协议 - 不同协议应实现
ReceiveMsgHook接口
服务端发布
- 设计包含
decodeMessage接口的协议转换 API,其可以把包体转换为 CloudEvent - 更新
MessageTransferTask下的Session.upstreamMsg(),将入参Message改为CloudEvent,这使用了 上一步的decodeMessageAPI 来进行对 CloudEvent 的转换 - 更新
SessionSender.send(),将入参Message改为CloudEvent - 更新
MeshMQProducerAPI,支持在运行时发送CloudEvents - 在
connector-plugin中实现支持向 EventStore 中发送CloudEvents
服务端订阅
- 支持将连接器插件中的
RocketMessage改为 `CloudEvent - 重写
AsyncMessageListener.consume()函数,将入参Message改为CloudEvent - 更新
MeshMQPushConsumer.updateOffset(),将入参Message改为CloudEvent - 更新
DownStreamMsgContext,将入参Message改为CloudEvent,更新DownStreamMsgContext.ackMsg
HTTP
SDK 端发布
- 支持
LiteProducer.publish(cloudEvent) - 在 http 请求头中添加 CloudEvents 标识符
SDK 端订阅
服务端发布
- 支持根据
HttpCommand首部中的协议类型,通过可插拔的协议插件构造HttpCommand.body - 支持在消息处理器中发布 CloudEvent
服务端订阅
- 更新
EventMeshConsumer.subscribe() - 更新
HandleMsgContext, 将入参Message改为CloudEvent - 更新
AsyncHttpPushRequest.tryHTTPRequest()