Skip to main content
版本:v1.7.0

TCP协议文档

1. 协议格式

dataFlow

消息组成详解:

魔术字:9位,当前值为“EventMesh”

通信协议版本号:4位,当前值为“0000”

消息总长度值(length):4位,int类型

消息头长度值(headerLength):4位,int类型

消息头(header):长度 = headerLength

消息体(body):长度 = length - headerLength - 4 - 4

2. 业务逻辑层

  • 消息组成

消息头(header)+ 消息体(body)

public class Package {

private Header header;
private Object body;
}


public class Header {

private Command cmd;
private int code;
private String msg;
private String seq;
}
  • 详解

消息头(header):类型为Header,Header中有Command字段,用于区分不同的消息类型

消息体(body):对于不同的消息类型,body的类型不同

消息命令字body类型
HEARTBEAT_REQUEST, HEARTBEAT_RESPONSE, HELLO_RESPONSE, CLIENT_GOODBYE_REQUEST, CLIENT_GOODBYE_RESPONSE, SERVER_GOODBYE_REQUEST, SERVER_GOODBYE_RESPONSE, LISTEN_REQUEST, LISTEN_RESPONSE, UNSUBSCRIBE_REQUEST, SUBSCRIBE_RESPONSE, UNSUBSCRIBE_RESPONSE, ASYNC_MESSAGE_TO_SERVER_ACK, BROADCAST_MESSAGE_TO_SERVER_ACK
HELLO_REQUESTUserAgent
SUBSCRIBE_REQUESTSubscription
REQUEST_TO_SERVER, REQUEST_TO_CLIENT, RESPONSE_TO_SERVER, RESPONSE_TO_CLIENT, ASYNC_MESSAGE_TO_SERVER, ASYNC_MESSAGE_TO_CLIENT, BROADCAST_MESSAGE_TO_SERVER, BROADCAST_MESSAGE_TO_CLIENT, ASYNC_MESSAGE_TO_CLIENT_ACK, BROADCAST_MESSAGE_TO_CLIENT_ACK, RESPONSE_TO_CLIENT_ACK, REQUEST_TO_CLIENT_ACKOpenMessage
REDIRECT_TO_CLIENTRedirectInfo

3. Client 与 Eventmesh-Runtime(Server)交互场景详解

public enum Command {

//心跳
HEARTBEAT_REQUEST(0), //client发给server的心跳包
HEARTBEAT_RESPONSE(1), //server回复client的心跳包

//握手
HELLO_REQUEST(2), //client发给server的握手请求
HELLO_RESPONSE(3), //server回复client的握手请求

//断连
CLIENT_GOODBYE_REQUEST(4), //client主动断连时通知server
CLIENT_GOODBYE_RESPONSE(5), //server回复client的主动断连通知
SERVER_GOODBYE_REQUEST(6), //server主动断连时通知client
SERVER_GOODBYE_RESPONSE(7), //client回复server的主动断连通知

//订阅管理
SUBSCRIBE_REQUEST(8), //client发给server的订阅请求
SUBSCRIBE_RESPONSE(9), //server回复client的订阅请求
UNSUBSCRIBE_REQUEST(10), //client发给server的取消订阅请求
UNSUBSCRIBE_RESPONSE(11), //server回复client的取消订阅请求

//监听
LISTEN_REQUEST(12), //client发给server的启动监听请求
LISTEN_RESPONSE(13), //server回复client的监听请求

//RR
REQUEST_TO_SERVER(14), //client将RR请求发送给server
REQUEST_TO_CLIENT(15), //server将RR请求推送给client
REQUEST_TO_CLIENT_ACK(16), //client收到RR请求后ACK给server
RESPONSE_TO_SERVER(17), //client将RR回包发送给server
RESPONSE_TO_CLIENT(18), //server将RR回包推送给client
RESPONSE_TO_CLIENT_ACK(19), //client收到回包后ACK给server

//异步事件
ASYNC_MESSAGE_TO_SERVER(20), //client将异步事件发送给server
ASYNC_MESSAGE_TO_SERVER_ACK(21), //server收到异步事件后ACK给client
ASYNC_MESSAGE_TO_CLIENT(22), //server将异步事件推送给client
ASYNC_MESSAGE_TO_CLIENT_ACK(23), //client收到异步事件后ACK给server

//广播
BROADCAST_MESSAGE_TO_SERVER(24), //client将广播消息发送给server
BROADCAST_MESSAGE_TO_SERVER_ACK(25), //server收到广播消息后ACK给client
BROADCAST_MESSAGE_TO_CLIENT(26), //server将广播消息推送给client
BROADCAST_MESSAGE_TO_CLIENT_ACK(27), //client收到广播消息后ACK给server

//重定向指令
REDIRECT_TO_CLIENT(30), //server将重定向指令推动给client
}

4. Client发起交互

场景Client向Server发送消息命令字Server回复Client消息的命令字说明
握手HELLO_REQUESTHELLO_RESPONSE
心跳HEARTBEAT_REQUESTHEARTBEAT_RESPONSE
订阅SUBSCRIBE_REQUESTSUBSCRIBE_RESPONSE
取消订阅UNSUBSCRIBE_REQUESTUNSUBSCRIBE_RESPONSE
开始监听消息LISTEN_REQUESTLISTEN_RESPONSE
发送RR请求REQUEST_TO_SERVERRESPONSE_TO_CLIENT
发送RR回包RESPONSE_TO_SERVER
发送异步事件ASYNC_MESSAGE_TO_SERVERASYNC_MESSAGE_TO_SERVER_ACK
发送广播事件BROADCAST_MESSAGE_TO_SERVERBROADCAST_MESSAGE_TO_SERVER_ACK
客户端主动断连CLIENT_GOODBYE_REQUESTCLIENT_GOODBYE_RESPONSE

5. Server发起交互

场景Server向Client发送消息命令字Client回复Server消息命令字说明
客户端接收RR请求REQUEST_TO_CLIENTREQUEST_TO_CLIENT_ACK
客户端接收RR回包RESPONSE_TO_CLIENTRESPONSE_TO_CLIENT_ACK
客户端接收异步事件ASYNC_MESSAGE_TO_CLIENTASYNC_MESSAGE_TO_CLIENT_ACK
客户端接收广播事件BROADCAST_MESSAGE_TO_CLIENTBROADCAST_MESSAGE_TO_CLIENT_ACK
服务端主动断连SERVER_GOODBYE_REQUEST
服务端进行重定向REDIRECT_TO_CLIENT

6. 消息类型

  • 发送RR消息

rr-msg

  • 发送异步单播消息

async-msg

  • 发送广播消息

broadcast-msg

HTTP协议文档

Java类LiteMessagecontent字段表示一个特殊的协议,因此,如果您要使用eventmesh-sdk-java的http-client,则只需设计协议的content即可。LiteMessage组成如下:

public class LiteMessage {

private String bizSeqNo;

private String uniqueId;

private String topic;

private String content;

private Map<String, String> prop;

private long createTime = System.currentTimeMillis();
}

1. 消息发送方式与组成

消息发送方式:POST方式

消息组成:请求头(RequestHeader) + 请求体(RequestBody)

  • 心跳消息

RequestHeader

Key说明
Envclient所属环境
Regionclient所属区域
Idcclient所属IDC
Dcnclient所在DCN
Sysclient所属子系统
Pidclient进程号
Ipclient Ip
Usernameclient 用户名
Passwdclient 密码
Version协议版本
Language语言描述
Code请求码

RequestBody

Key说明
clientType客户端类型
heartbeatEntities心跳实体,包含topic、url等信息
  • 订阅消息:

RequestHeader

与心跳消息一致

RequestBody

Key说明
topic客户端订阅的topic
urltopic对应的url
  • 取消订阅消息:

RequestHeader

与心跳消息一致

RequestBody

与订阅消息一致

  • 发送异步事件:

RequestHeader

与心跳消息一致

RequestBody

Key说明
topic客户端请求的topic
content客户端发送的topic的内容
ttl客户端请求超时时间
bizSeqNo客户端请求业务流水号
uniqueId客户端请求消息唯一标识

2. Client发起交互

场景Client向Server发送消息请求码Server回复Client消息的响应码说明
心跳HEARTBEAT(203)SUCCESS(0)/EVENTMESH_HEARTBEAT_ERROR(19)
订阅SUBSCRIBE(206)SUCCESS(0)/EVENTMESH_SUBSCRIBE_ERROR(17)
取消订阅UNSUBSCRIBE(207)SUCCESS(0)/EVENTMESH_UNSUBSCRIBE_ERROR(18)
发送异步事件MSG_SEND_ASYNC(104)SUCCESS(0)/EVENTMESH_SEND_ASYNC_MSG_ERR(14)

3. Server发起交互

场景Server向Client发送消息请求码Client回复Server消息响应码说明
客户端接收异步事件HTTP_PUSH_CLIENT_ASYNC(105)retCoderetCode值为0时代表成功

gRPC 协议文档

1. protobuf

eventmesh-protocol-gprc 模块有 Eventmesh gRPC 客户端的 protobuf 文件. the protobuf 文件路径是 /src/main/proto/eventmesh-client.proto.

用gradle build 生成 gRPC 代码在 /build/generated/source/proto/main. 生成代码用于 eventmesh-sdk-java 模块.

2. gRPC 数据模型

  • 消息

以下消息数据模型用于 publish(), requestReply()broadcast() APIs.

message RequestHeader {
string env = 1;
string region = 2;
string idc = 3;
string ip = 4;
string pid = 5;
string sys = 6;
string username = 7;
string password = 8;
string language = 9;
string protocolType = 10;
string protocolVersion = 11;
string protocolDesc = 12;
}

message SimpleMessage {
RequestHeader header = 1;
string producerGroup = 2;
string topic = 3;
string content = 4;
string ttl = 5;
string uniqueId = 6;
string seqNum = 7;
string tag = 8;
map<string, string> properties = 9;
}

message BatchMessage {
RequestHeader header = 1;
string producerGroup = 2;
string topic = 3;

message MessageItem {
string content = 1;
string ttl = 2;
string uniqueId = 3;
string seqNum = 4;
string tag = 5;
map<string, string> properties = 6;
}

repeated MessageItem messageItem = 4;
}

message Response {
string respCode = 1;
string respMsg = 2;
string respTime = 3;
}
  • 订阅

以下订阅数据模型用于 subscribe()unsubscribe() APIs.

message Subscription {
RequestHeader header = 1;
string consumerGroup = 2;

message SubscriptionItem {
enum SubscriptionMode {
CLUSTERING = 0;
BROADCASTING = 1;
}

enum SubscriptionType {
ASYNC = 0;
SYNC = 1;
}

string topic = 1;
SubscriptionMode mode = 2;
SubscriptionType type = 3;
}

repeated SubscriptionItem subscriptionItems = 3;
string url = 4;
}
  • 心跳

以下心跳数据模型用于 heartbeat() API.

message Heartbeat {
enum ClientType {
PUB = 0;
SUB = 1;
}

RequestHeader header = 1;
ClientType clientType = 2;
string producerGroup = 3;
string consumerGroup = 4;

message HeartbeatItem {
string topic = 1;
string url = 2;
}

repeated HeartbeatItem heartbeatItems = 5;
}

3. gRPC 服务接口

  • 事件生产端服务 APIs
service PublisherService {
# 异步事件生产
rpc publish(SimpleMessage) returns (Response);

# 同步事件生产
rpc requestReply(SimpleMessage) returns (Response);

# 批量事件生产
rpc batchPublish(BatchMessage) returns (Response);
}
  • 事件消费端服务 APIs
service ConsumerService {
# 所消费事件通过 HTTP Webhook推送事件
rpc subscribe(Subscription) returns (Response);

# 所消费事件通过 TCP stream推送事件
rpc subscribeStream(Subscription) returns (stream SimpleMessage);

rpc unsubscribe(Subscription) returns (Response);
}
  • 客户端心跳服务 API
service HeartbeatService {
rpc heartbeat(Heartbeat) returns (Response);
}