Skip to main content
Version: v1.2.0

HTTP Protocol

EventMesh SDK for Java implements the HTTP producer and consumer of asynchronous messages. Both the producer and consumer require an instance of EventMeshHttpClientConfig class that specifies the configuration of EventMesh HTTP client. The liteEventMeshAddr, userName, and password fields should match the eventmesh.properties file of EventMesh runtime.

import org.apache.eventmesh.client.http.conf.EventMeshHttpClientConfig;
import org.apache.eventmesh.common.utils.IPUtils;
import org.apache.eventmesh.common.utils.ThreadUtils;

public class HTTP {
public static void main(String[] args) throws Exception {
EventMeshHttpClientConfig eventMeshClientConfig = EventMeshHttpClientConfig.builder()
.liteEventMeshAddr("localhost:10105")
.producerGroup("TEST_PRODUCER_GROUP")
.env("env")
.idc("idc")
.ip(IPUtils.getLocalAddress())
.sys("1234")
.pid(String.valueOf(ThreadUtils.getPID()))
.userName("eventmesh")
.password("password")
.build();
/* ... */
}
}

HTTP Consumer

The EventMeshHttpConsumer class implements the heartbeat, subscribe, and unsubscribe methods. The subscribe method accepts a list of SubscriptionItem that defines the topics to be subscribed and a callback URL.

import org.apache.eventmesh.client.http.consumer.EventMeshHttpConsumer;
import org.apache.eventmesh.common.protocol.SubscriptionItem;
import org.apache.eventmesh.common.protocol.SubscriptionMode;
import org.apache.eventmesh.common.protocol.SubscriptionType;
import com.google.common.collect.Lists;

public class HTTP {
final String url = "http://localhost:8080/callback";
final List<SubscriptionItem> topicList = Lists.newArrayList(
new SubscriptionItem("eventmesh-async-topic", SubscriptionMode.CLUSTERING, SubscriptionType.ASYNC)
);

public static void main(String[] args) throws Exception {
/* ... */
eventMeshHttpConsumer = new EventMeshHttpConsumer(eventMeshClientConfig);
eventMeshHttpConsumer.heartBeat(topicList, url);
eventMeshHttpConsumer.subscribe(topicList, url);
/* ... */
eventMeshHttpConsumer.unsubscribe(topicList, url);
}
}

The EventMesh runtime will send a POST request that contains the message in the CloudEvents format to the callback URL. The SubController.java file implements a Spring Boot controller that receives and parses the callback messages.

HTTP Producer

The EventMeshHttpProducer class implements the publish method. The publish method accepts the message to be published and an optional timeout value. The message should be an instance of either of these classes:

  • org.apache.eventmesh.common.EventMeshMessage
  • io.cloudevents.CloudEvent
  • io.openmessaging.api.Message
import org.apache.eventmesh.client.http.producer.EventMeshHttpProducer;
import org.apache.eventmesh.client.tcp.common.EventMeshCommon;
import org.apache.eventmesh.common.Constants;
import org.apache.eventmesh.common.utils.JsonUtils;

import io.cloudevents.CloudEvent;
import io.cloudevents.core.builder.CloudEventBuilder;

public class HTTP {
public static void main(String[] args) throws Exception {
/* ... */
EventMeshHttpProducer eventMeshHttpProducer = new EventMeshHttpProducer(eventMeshClientConfig);
Map<String, String> content = new HashMap<>();
content.put("content", "testAsyncMessage");

CloudEvent event = CloudEventBuilder.v1()
.withId(UUID.randomUUID().toString())
.withSubject("eventmesh-async-topic")
.withSource(URI.create("/"))
.withDataContentType("application/cloudevents+json")
.withType(EventMeshCommon.CLOUD_EVENTS_PROTOCOL_NAME)
.withData(JsonUtils.serialize(content).getBytes(StandardCharsets.UTF_8))
.withExtension(Constants.EVENTMESH_MESSAGE_CONST_TTL, String.valueOf(4 * 1000))
.build();
eventMeshHttpProducer.publish(event);
}
}

Using Curl Command

You can also publish/subscribe event without eventmesh SDK.

Publish

curl -H "Content-Type:application/json" -X POST -d '{"name": "admin", "pass":"12345678"}' http://127.0.0.1:10105/eventmesh/publish/TEST-TOPIC-HTTP-ASYNC

After you start the eventmesh runtime server, you can use the curl command publish the event to the specific topic with the HTTP POST method and the package body must be in JSON format. The publish url like (http://127.0.0.1:10105/eventmesh/publish/TEST-TOPIC-HTTP-ASYNC), and you will get the publish successful result.

Subscribe

curl -H "Content-Type:application/json" -X POST -d '{"url": "http://127.0.0.1:8088/sub/test", "consumerGroup":"TEST-GROUP", "topic":[{"mode":"CLUSTERING","topic":"TEST-TOPIC-HTTP-ASYNC","type":"ASYNC"}]}' http://127.0.0.1:10105/eventmesh/subscribe/local

After you start the eventmesh runtime server, you can use the curl command to subscribe the specific topic list with the HTTP POST method, and the package body must be in JSON format. The subscribe url like (http://127.0.0.1:10105/eventmesh/subscribe/local), and you will get the subscribe successful result. You should pay attention to the url field in the package body, which means you need to set up an HTTP service at the specified URL, you can see the example in the eventmesh-examples module.