Skip to main content
版本:v1.8.0

TCP 协议

EventMesh Java SDK 实现了同步、异步和广播 TCP 消息的生产者和消费者。 二者都需要一个 EventMeshHttpClientConfig 类实例来指定 EventMesh TCP 客户端的配置信息。其中的 hostport 字段需要和 EventMesh runtime eventmesh.properties 文件中的相匹配。

import org.apache.eventmesh.client.tcp.conf.EventMeshTCPClientConfig;
import org.apache.eventmesh.client.tcp.common.ReceiveMsgHook;
import io.cloudevents.CloudEvent;

public class AsyncSubscribe implements ReceiveMsgHook<CloudEvent> {
public static void main(String[] args) throws InterruptedException {
EventMeshTCPClientConfig eventMeshTcpClientConfig = EventMeshTCPClientConfig.builder()
.host(eventMeshIp)
.port(eventMeshTcpPort)
.userAgent(userAgent)
.build();
/* ... */
}
}

TCP 消费者

消费者应该实现 ReceiveMsgHook 类,其被定义在 ReceiveMsgHook.java

public interface ReceiveMsgHook<ProtocolMessage> {
Optional<ProtocolMessage> handle(ProtocolMessage msg);
}

EventMeshTCPClient 实现了 subscribe 方法。该方法接收话题、SubscriptionModeSubscriptionTypehandle 方法将会在消费者从订阅的话题中收到消息时被调用。如果 SubscriptionTypeSYNChandle 的返回值将被发送回生产者。

import org.apache.eventmesh.client.tcp.EventMeshTCPClient;
import org.apache.eventmesh.client.tcp.EventMeshTCPClientFactory;
import org.apache.eventmesh.client.tcp.common.ReceiveMsgHook;
import org.apache.eventmesh.common.protocol.SubscriptionMode;
import org.apache.eventmesh.common.protocol.SubscriptionType;
import io.cloudevents.CloudEvent;

public class TCPConsumer implements ReceiveMsgHook<CloudEvent> {
public static TCPConsumer handler = new TCPConsumer();
private static EventMeshTCPClient<CloudEvent> client;

public static void main(String[] args) throws Exception {
client = EventMeshTCPClientFactory.createEventMeshTCPClient(
eventMeshTcpClientConfig,
CloudEvent.class
);
client.init();

client.subscribe(
"eventmesh-sync-topic",
SubscriptionMode.CLUSTERING,
SubscriptionType.SYNC
);

client.registerSubBusiHandler(handler);
client.listen();
}

@Override
public Optional<CloudEvent> handle(CloudEvent message) {
log.info("Messaged received: {}", message);
return Optional.of(message);
}
}

TCP 生产者

异步生产者

EventMeshTCPClient 实现了 public 方法。该方法接收将被发布的消息和一个可选的 timeout 值,并返回来自消费者的响应消息。

/* ... */
client = EventMeshTCPClientFactory.createEventMeshTCPClient(eventMeshTcpClientConfig, CloudEvent.class);
client.init();

CloudEvent event = CloudEventBuilder.v1()
.withId(UUID.randomUUID().toString())
.withSubject(ExampleConstants.EVENTMESH_GRPC_ASYNC_TEST_TOPIC)
.withSource(URI.create("/"))
.withDataContentType(ExampleConstants.CLOUDEVENT_CONTENT_TYPE)
.withType(EventMeshCommon.CLOUD_EVENTS_PROTOCOL_NAME)
.withData(JsonUtils.serialize(content).getBytes(StandardCharsets.UTF_8))
.withExtension(Constants.EVENTMESH_MESSAGE_CONST_TTL, String.valueOf(4 * 1000))
.build();
client.publish(event, 1000);

同步生产者

EventMeshTCPClient 实现了 rr 方法。该方法接收将被发布的消息和一个可选的 timeout 值,并返回来自消费者的响应消息。

/* ... */
client = EventMeshTCPClientFactory.createEventMeshTCPClient(eventMeshTcpClientConfig, CloudEvent.class);
client.init();

CloudEvent event = CloudEventBuilder.v1()
.withId(UUID.randomUUID().toString())
.withSubject(ExampleConstants.EVENTMESH_GRPC_ASYNC_TEST_TOPIC)
.withSource(URI.create("/"))
.withDataContentType(ExampleConstants.CLOUDEVENT_CONTENT_TYPE)
.withType(EventMeshCommon.CLOUD_EVENTS_PROTOCOL_NAME)
.withData(JsonUtils.serialize(content).getBytes(StandardCharsets.UTF_8))
.withExtension(Constants.EVENTMESH_MESSAGE_CONST_TTL, String.valueOf(4 * 1000))
.build();

Package response = client.rr(event, 1000);
CloudEvent replyEvent = EventFormatProvider
.getInstance()
.resolveFormat(JsonFormat.CONTENT_TYPE)
.deserialize(response.getBody().toString().getBytes(StandardCharsets.UTF_8));