Vert.x 之 Event Bus (事件总线跨网络集群通讯Demo)

/ 后端 / 没有评论 / 455浏览

当在同一Vertx实例中运行不同的Verticle,则直接可以使用Event Bus进行跨不同Verticle事件循环中进行消息通讯;如果Vertx运行在不同主机,即不同网络中,则使用以下集群方式;

此处演示的同一局域网下,不同端口进行部署Vertx的web服务;

项目依赖

<properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>

    <maven-compiler-plugin.version>3.8.1</maven-compiler-plugin.version>
    <maven-shade-plugin.version>3.2.4</maven-shade-plugin.version>
    <maven-surefire-plugin.version>2.22.2</maven-surefire-plugin.version>
    <exec-maven-plugin.version>3.0.0</exec-maven-plugin.version>

    <vertx.version>4.4.2</vertx.version>
    <junit-jupiter.version>5.9.1</junit-jupiter.version>

    <main.verticle>com.example.starter.verticle.SendVerticle</main.verticle>
    <launcher.class>io.vertx.core.Launcher</launcher.class>
  </properties>

  <dependencyManagement>
    <dependencies>
      <dependency>
        <groupId>io.vertx</groupId>
        <artifactId>vertx-stack-depchain</artifactId>
        <version>${vertx.version}</version>
        <type>pom</type>
        <scope>import</scope>
      </dependency>
    </dependencies>
  </dependencyManagement>

  <dependencies>
    <dependency>
      <groupId>io.vertx</groupId>
      <artifactId>vertx-core</artifactId>
    </dependency>
    <dependency>
      <groupId>io.vertx</groupId>
      <artifactId>vertx-web</artifactId>
    </dependency>
    <!-- https://mvnrepository.com/artifact/io.vertx/vertx-hazelcast -->
    <dependency>
      <groupId>io.vertx</groupId>
      <artifactId>vertx-hazelcast</artifactId>
      <version>4.4.4</version>
    </dependency>
  </dependencies>

1.发送端

MainSender

package com.example.starter.verticle.main;

import io.vertx.core.AbstractVerticle;
import io.vertx.core.Vertx;
import io.vertx.core.VertxOptions;

public class MainSender  extends AbstractVerticle {

  public static void main(String[] args) {
    VertxOptions options = new VertxOptions();
    Vertx.clusteredVertx(options, res -> {
      if (res.succeeded()) {
        Vertx vertx = res.result();
        vertx.deployVerticle("com.example.starter.verticle.SendVerticle");
      } else {

      }
    });
  }
}

SendVerticle

package com.example.starter.verticle;

import io.vertx.core.AbstractVerticle;
import io.vertx.core.Promise;

public class SendVerticle extends AbstractVerticle {

  @Override
  public void start(Promise<Void> startPromise) throws Exception {

    vertx.createHttpServer().requestHandler(req -> {
      vertx.eventBus().request("ceshi", req.uri());

      req.response()
        .putHeader("content-type", "text/plain")
        .end("Hello from Vert.x8888!");
    }).listen(8888, http -> {
      if (http.succeeded()) {
        startPromise.complete();
        System.out.println("HTTP server started on port 8888");
      } else {
        startPromise.fail(http.cause());
      }
    });
  }
}

2.接收端

MainConsumer

package com.example.starter.verticle.main;

import io.vertx.core.AbstractVerticle;
import io.vertx.core.Vertx;
import io.vertx.core.VertxOptions;

public class MainConsumer  extends AbstractVerticle {

  public static void main(String[] args) throws Exception {
    VertxOptions options = new VertxOptions();
    Vertx.clusteredVertx(options, res -> {
      if (res.succeeded()) {
        Vertx vertx = res.result();
        vertx.deployVerticle("com.example.starter.verticle.ConsumeVerticle");
      } else {

      }
    });
  }


}

ConsumeVerticle

package com.example.starter.verticle;

import io.vertx.core.AbstractVerticle;
import io.vertx.core.Promise;

public class ConsumeVerticle extends AbstractVerticle {

  @Override
  public void start(Promise<Void> startPromise) throws Exception {
    vertx.eventBus().consumer("ceshi", r -> {
      Object body = r.body();
      System.out.println("接收到信息: " + body);
    });

    vertx.createHttpServer().requestHandler(req -> {
      req.response()
        .putHeader("content-type", "text/plain")
        .end("Hello from Vert.x9999!");
    }).listen(9999, http -> {
      if (http.succeeded()) {
        startPromise.complete();
        System.out.println("HTTP server started on port 9999");
      } else {
        startPromise.fail(http.cause());
      }
    });
  }
}