目录
- Vert.x系列(零),开篇,认识Vert.x并创建一个Http服务
- Vert.x系列(一),Http Router
- Vert.x系列(二),编写REST API并解析请求参数
- Vert.x系列(三),加载静态资源文件
- Vert.x系列(四),EventBus事件总线
- Vert.x系列(五),基于EventBus构建分布式应用
正文
EventBus也叫事件总线,效果如上图所示。发布者发布事件到总线,订阅者从总线订阅事件,然后处理事件,响应事件。看起来有点像一个消息队列系统,都有发布者和订阅者,但这一切都是Vert.x内部的一个实现,可以说是最经典的功能之一。另外这里关于名词作一点说明:发布者可以叫做生产者,订阅者也可以叫消费者。
EventBus一般用于什么场合呢?
异步处理和分布式应用。
所谓异步处理,就是事件消息发送出去,并不关心其处理结果,交由订阅者后台慢慢处理。在当前版本的Vert.x体现的方法就是publisher和send方法。
发布者代码一,publish方法
vertx.eventBus().publish("address", "msg");
发布者代码二,send方法
vertx.eventBus().send("address", "msg");
在这可以看到发布者的实现有两种方法,publish和send。它们之间是有区别的。publish发出去的消息,所有的订阅者都会收到,而send是点对点,只有一个订阅者会收到,该使用哪种方式,要根据实际情况选择。
订阅者代码
MessageConsumer<String> consumer = vertx.eventBus().consumer("address");
consumer.handler(msg ->{
System.out.println(msg.body());
});
看到这里,可能有读者会问,订阅者处理完发布者的消息,可以回应发布者吗?发布者可以获取到订阅者的回应吗?
答案是可以的。
接下来看消费者如何回应发布者:
MessageConsumer<String> consumer = vertx.eventBus().consumer("address");
consumer.handler(msg ->{
System.out.println(msg.body());
msg.reply("success");
});
从代码里面可以看到msg有一个reply方法,这个方法就是用来做出回应的。
再看发布者如何获取订阅者的回应:
vertx.eventBus().request("address", "msg", rh -> {
if (rh.succeeded()) {
System.out.println(rh.result().body());
}
});
这里要留意发布者使用的request方法,不是publish或者send了。
表情图片来自网络
!_!:纳尼!!前面不是说发布者是用publish和send发送事件消息的吗,这里怎么变request了? 作者你的节操呢?
其实笔者也挺委屈的,在以前的版本中send方法是支持获取响应的,但在当前版本中已经被标记为过时方法了。这里应采用request方法。 request方法提供了一个Handler<AsyncResult<Message>>的参数,这个参数就是用来获取订阅者响应的。
=:好吧,那么可爱的读者宝宝就暂时原谅你了。那你说了这么多,能整一个案例不?倒是让本宝宝吃一个栗子啊。
举个例子,假设Http verticle有一个接口/user/:id
,当接口获取到id参数后,通过EventBus,将参数发送到一个DatabaseVerticle里面,在这个verticle里面有一个订阅者,收到参数后就去数据库里面查询对应的数据,然后返回给Http verticle中的接口,并返回到前端。
DatabaseVerticle.java
package com.javafm.vertx;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.eventbus.MessageConsumer;
import io.vertx.core.json.JsonObject;
public class DatabaseVerticle extends AbstractVerticle {
@Override
public void start() throws Exception {
super.start();
MessageConsumer<JsonObject> consumer = vertx.eventBus().consumer("com.javafm.vertx.database");
consumer.handler(msg -> {
// 加装从数据库查询出数据,然后返回
System.out.println(msg.body());
JsonObject json = new JsonObject();
json.put("id", 1);
json.put("name", "dev-tang");
msg.reply(json);
});
}
}
HttpVerticle.java
package com.javafm.vertx;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.http.HttpServer;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.RoutingContext;
import io.vertx.ext.web.handler.BodyHandler;
public class HttpVerticle extends AbstractVerticle {
@Override
public void start() throws Exception {
HttpServer server = vertx.createHttpServer();
Router router = Router.router(vertx);
router.route().handler(BodyHandler.create());
router.get("/user/:id").handler(this::getUser);
server.requestHandler(router);
server.listen(8080);
}
private void getUser(RoutingContext ctx) {
int id = Integer.parseInt(ctx.request().getParam("id"));
JsonObject json = new JsonObject().put("id", id);
vertx.eventBus().request("com.javafm.vertx.database", json, r -> {
if (r.succeeded()){
out(ctx, r.result().body().toString());
} else {
r.cause().printStackTrace();
ctx.fail(r.cause());
}
});
}
private void out(RoutingContext ctx, String msg) {
ctx.response().putHeader("Content-Type", "application/json; charset=utf-8")
.end(msg);
}
}
App.java
package com.javafm.vertx;
import io.vertx.core.Vertx;
public class App {
public static void main(String[] args) {
Vertx vertx = Vertx.vertx();
vertx.deployVerticle(new DatabaseVerticle());
vertx.deployVerticle(new HttpVerticle());
}
}
App.java是项目启动类,在这个类里面发布两个Verticle。接着我们启动这个项目,在浏览器访问接口:http://localhost:8080/user/1
,出现如图1-1所示则表示结果是正确的。
图1-1
本文章中体现了EventBus异步处理,如果要做分布式应用开发又是怎么搞的呢?请听下回分解。
本节源代码地址:https://gitee.com/dev-tang/learning-vertx/tree/learn-05/