Skip to main content
版本:v1.2.0

gRPC 协议

EventMesh Java SDK 实现了 gRPC 同步、异步和广播消息的生产者和消费者。二者都需要一个 EventMeshHttpClientConfig 类实例来指定 EventMesh gRPC 客户端的配置信息。其中的 liteEventMeshAddruserNamepassword 字段需要和 EventMesh runtime eventmesh.properties 文件中的相匹配。

import org.apache.eventmesh.client.grpc.config.EventMeshGrpcClientConfig;
import org.apache.eventmesh.client.grpc.consumer.ReceiveMsgHook;
import io.cloudevents.CloudEvent;

public class CloudEventsAsyncSubscribe implements ReceiveMsgHook<CloudEvent> {
public static void main(String[] args) throws InterruptedException {
EventMeshGrpcClientConfig eventMeshClientConfig = EventMeshGrpcClientConfig.builder()
.serverAddr("localhost")
.serverPort(10205)
.consumerGroup(ExampleConstants.DEFAULT_EVENTMESH_TEST_CONSUMER_GROUP)
.env("env").idc("idc")
.sys("1234").build();
/* ... */
}
}

gRPC 消费者

流消费者

EventMesh runtime 会将来自生产者的信息作为一系列事件流向流消费者发送。消费者应实现 ReceiveHook 类,其被定义在 ReceiveMsgHook.java

public interface ReceiveMsgHook<T> {
Optional<T> handle(T msg) throws Throwable;
String getProtocolType();
}

EventMeshGrpcConsumer 实现了 registerListenersubscribeunsubscribe 方法。subscribe 方法接收一个 SubscriptionItem 对象的列表,其中定义了要订阅的话题。registerListener 接收一个实现了 ReceiveMsgHook 的实例。handle 方法将会在消费者收到订阅的主题消息时被调用。如果 SubscriptionTypeSYNChandle 的返回值将被发送回生产者。

import org.apache.eventmesh.client.grpc.consumer.EventMeshGrpcConsumer;
import org.apache.eventmesh.client.grpc.consumer.ReceiveMsgHook;
import org.apache.eventmesh.client.tcp.common.EventMeshCommon;
import org.apache.eventmesh.common.protocol.SubscriptionItem;
import org.apache.eventmesh.common.protocol.SubscriptionMode;
import org.apache.eventmesh.common.protocol.SubscriptionType;
import io.cloudevents.CloudEvent;

public class CloudEventsAsyncSubscribe implements ReceiveMsgHook<CloudEvent> {
public static CloudEventsAsyncSubscribe handler = new CloudEventsAsyncSubscribe();
public static void main(String[] args) throws InterruptedException {
/* ... */
SubscriptionItem subscriptionItem = new SubscriptionItem(
"eventmesh-async-topic",
SubscriptionMode.CLUSTERING,
SubscriptionType.ASYNC
);
EventMeshGrpcConsumer eventMeshGrpcConsumer = new EventMeshGrpcConsumer(eventMeshClientConfig);

eventMeshGrpcConsumer.init();
eventMeshGrpcConsumer.registerListener(handler);
eventMeshGrpcConsumer.subscribe(Collections.singletonList(subscriptionItem));
/* ... */
eventMeshGrpcConsumer.unsubscribe(Collections.singletonList(subscriptionItem));
}

@Override
public Optional<CloudEvent> handle(CloudEvent message) {
log.info("Messaged received: {}", message);
return Optional.empty();
}

@Override
public String getProtocolType() {
return EventMeshCommon.CLOUD_EVENTS_PROTOCOL_NAME;
}
}

Webhook 消费者

EventMeshGrpcConsumersubscribe 方法接收一个 SubscriptionItem 对象的列表,其中定义了要订阅的主题和一个可选的 timeout 值。如果提供了回调 URL,EventMesh runtime 将向回调 URL 地址发送一个包含 CloudEvents 格式 消息的 POST 请求。SubController.java 实现了一个接收并解析回调信息的 Spring Boot controller。

import org.apache.eventmesh.client.grpc.consumer.EventMeshGrpcConsumer;
import org.apache.eventmesh.client.grpc.consumer.ReceiveMsgHook;
import org.apache.eventmesh.client.tcp.common.EventMeshCommon;
import org.apache.eventmesh.common.protocol.SubscriptionItem;
import org.apache.eventmesh.common.protocol.SubscriptionMode;
import org.apache.eventmesh.common.protocol.SubscriptionType;

@Component
public class SubService implements InitializingBean {
final String url = "http://localhost:8080/callback";

public void afterPropertiesSet() throws Exception {
/* ... */
eventMeshGrpcConsumer = new EventMeshGrpcConsumer(eventMeshClientConfig);
eventMeshGrpcConsumer.init();

SubscriptionItem subscriptionItem = new SubscriptionItem(
"eventmesh-async-topic",
SubscriptionMode.CLUSTERING,
SubscriptionType.ASYNC
);

eventMeshGrpcConsumer.subscribe(Collections.singletonList(subscriptionItem), url);
/* ... */
eventMeshGrpcConsumer.unsubscribe(Collections.singletonList(subscriptionItem), url);
}
}

gRPC 生产者

异步生产者

EventMeshGrpcProducer 实现了 publish 方法。publish 方法接收将被发布的消息和一个可选的 timeout 值。消息应是下列类的一个实例:

  • org.apache.eventmesh.common.EventMeshMessage
  • io.cloudevents.CloudEvent
/* ... */
EventMeshGrpcProducer eventMeshGrpcProducer = new EventMeshGrpcProducer(eventMeshClientConfig);
eventMeshGrpcProducer.init();

Map<String, String> content = new HashMap<>();
content.put("content", "testAsyncMessage");

CloudEvent event = CloudEventBuilder.v1()
.withId(UUID.randomUUID().toString())
.withSubject(ExampleConstants.EVENTMESH_GRPC_ASYNC_TEST_TOPIC)
.withSource(URI.create("/"))
.withDataContentType(ExampleConstants.CLOUDEVENT_CONTENT_TYPE)
.withType(EventMeshCommon.CLOUD_EVENTS_PROTOCOL_NAME)
.withData(JsonUtils.serialize(content).getBytes(StandardCharsets.UTF_8))
.withExtension(Constants.EVENTMESH_MESSAGE_CONST_TTL, String.valueOf(4 * 1000))
.build();
eventMeshGrpcProducer.publish(event);

同步生产者

EventMeshGrpcProducer 实现了 requestReply 方法。requestReply 方法接收将被发布的消息和一个可选的 timeout 值。方法会返回消费者返回的消息。消息应是下列类的一个实例:

  • org.apache.eventmesh.common.EventMeshMessage
  • io.cloudevents.CloudEvent

批量生产者

EventMeshGrpcProducer 重写了 publish 方法,该方法接收一个将被发布的消息列表和一个可选的 timeout 值。列表中的消息应是下列类的一个实例:

  • org.apache.eventmesh.common.EventMeshMessage
  • io.cloudevents.CloudEvent
/* ... */
List<CloudEvent> cloudEventList = new ArrayList<>();
for (int i = 0; i < 5; i++) {
CloudEvent event = CloudEventBuilder.v1()
.withId(UUID.randomUUID().toString())
.withSubject(ExampleConstants.EVENTMESH_GRPC_ASYNC_TEST_TOPIC)
.withSource(URI.create("/"))
.withDataContentType(ExampleConstants.CLOUDEVENT_CONTENT_TYPE)
.withType(EventMeshCommon.CLOUD_EVENTS_PROTOCOL_NAME)
.withData(JsonUtils.serialize(content).getBytes(StandardCharsets.UTF_8))
.withExtension(Constants.EVENTMESH_MESSAGE_CONST_TTL, String.valueOf(4 * 1000))
.build();

cloudEventList.add(event);
}

eventMeshGrpcProducer.publish(cloudEventList);
/* ... */