Vert.x系列(四),EventBus事件总线

目录



正文


EventBus也叫事件总线,效果如上图所示。发布者发布事件到总线,订阅者从总线订阅事件,然后处理事件,响应事件。看起来有点像一个消息队列系统,都有发布者和订阅者,但这一切都是Vert.x内部的一个实现,可以说是最经典的功能之一。另外这里关于名词作一点说明:发布者可以叫做生产者,订阅者也可以叫消费者。

EventBus一般用于什么场合呢?

异步处理和分布式应用。

所谓异步处理,就是事件消息发送出去,并不关心其处理结果,交由订阅者后台慢慢处理。在当前版本的Vert.x体现的方法就是publisher和send方法。

发布者代码一,publish方法

vertx.eventBus().publish("address", "msg");

发布者代码二,send方法

vertx.eventBus().send("address", "msg");

在这可以看到发布者的实现有两种方法,publish和send。它们之间是有区别的。publish发出去的消息,所有的订阅者都会收到,而send是点对点,只有一个订阅者会收到,该使用哪种方式,要根据实际情况选择。

订阅者代码

MessageConsumer<String> consumer = vertx.eventBus().consumer("address");
consumer.handler(msg ->{
    System.out.println(msg.body());
});

看到这里,可能有读者会问,订阅者处理完发布者的消息,可以回应发布者吗?发布者可以获取到订阅者的回应吗?

答案是可以的。

接下来看消费者如何回应发布者:

MessageConsumer<String> consumer = vertx.eventBus().consumer("address");
consumer.handler(msg ->{
    System.out.println(msg.body());
    msg.reply("success");
});

从代码里面可以看到msg有一个reply方法,这个方法就是用来做出回应的。

再看发布者如何获取订阅者的回应:

vertx.eventBus().request("address", "msg", rh -> {
    if (rh.succeeded()) {
        System.out.println(rh.result().body());
    }
});

这里要留意发布者使用的request方法,不是publish或者send了。

表情图片来自网络

!_!:纳尼!!前面不是说发布者是用publish和send发送事件消息的吗,这里怎么变request了? 作者你的节操呢?

其实笔者也挺委屈的,在以前的版本中send方法是支持获取响应的,但在当前版本中已经被标记为过时方法了。这里应采用request方法。 request方法提供了一个Handler<AsyncResult<Message>>的参数,这个参数就是用来获取订阅者响应的。

=:好吧,那么可爱的读者宝宝就暂时原谅你了。那你说了这么多,能整一个案例不?倒是让本宝宝吃一个栗子啊。

举个例子,假设Http verticle有一个接口/user/:id,当接口获取到id参数后,通过EventBus,将参数发送到一个DatabaseVerticle里面,在这个verticle里面有一个订阅者,收到参数后就去数据库里面查询对应的数据,然后返回给Http verticle中的接口,并返回到前端。

DatabaseVerticle.java

package com.javafm.vertx;

import io.vertx.core.AbstractVerticle;
import io.vertx.core.eventbus.MessageConsumer;
import io.vertx.core.json.JsonObject;

public class DatabaseVerticle extends AbstractVerticle {
    @Override
    public void start() throws Exception {
        super.start();

        MessageConsumer<JsonObject> consumer = vertx.eventBus().consumer("com.javafm.vertx.database");
        consumer.handler(msg -> {
            // 加装从数据库查询出数据,然后返回
            System.out.println(msg.body());

            JsonObject json = new JsonObject();
            json.put("id", 1);
            json.put("name", "dev-tang");
            msg.reply(json);
        });
    }
}

HttpVerticle.java

package com.javafm.vertx;

import io.vertx.core.AbstractVerticle;
import io.vertx.core.http.HttpServer;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.RoutingContext;
import io.vertx.ext.web.handler.BodyHandler;


public class HttpVerticle extends AbstractVerticle {
    @Override
    public void start() throws Exception {
        HttpServer server = vertx.createHttpServer();

        Router router = Router.router(vertx);
        router.route().handler(BodyHandler.create());

        router.get("/user/:id").handler(this::getUser);

        server.requestHandler(router);
        server.listen(8080);
    }

    private void getUser(RoutingContext ctx) {
        int id = Integer.parseInt(ctx.request().getParam("id"));
        JsonObject json = new JsonObject().put("id", id);
        vertx.eventBus().request("com.javafm.vertx.database", json, r -> {
            if (r.succeeded()){
                out(ctx, r.result().body().toString());
            } else {
                r.cause().printStackTrace();
                ctx.fail(r.cause());
            }
        });
    }


    private void out(RoutingContext ctx, String msg) {
        ctx.response().putHeader("Content-Type", "application/json; charset=utf-8")
                .end(msg);
    }
}

App.java

package com.javafm.vertx;

import io.vertx.core.Vertx;

public class App {
    public static void main(String[] args) {
        Vertx vertx = Vertx.vertx();
        vertx.deployVerticle(new DatabaseVerticle());
        vertx.deployVerticle(new HttpVerticle());
    }
}

App.java是项目启动类,在这个类里面发布两个Verticle。接着我们启动这个项目,在浏览器访问接口:http://localhost:8080/user/1,出现如图1-1所示则表示结果是正确的。

图1-1

本文章中体现了EventBus异步处理,如果要做分布式应用开发又是怎么搞的呢?请听下回分解。

本节源代码地址:https://gitee.com/dev-tang/learning-vertx/tree/learn-05/

本博客采用 知识共享署名-禁止演绎 4.0 国际许可协议 进行许可

本文标题:Vert.x系列(四),EventBus事件总线

本文地址:https://jizhong.plus/post/2020/03/vert.x-04.html