您的位置 首页 > 娱乐休闲

Vert.x3 集群模式探究

Vert.x集群原理

Vert.x在1.0的时候就已经支持集群,那时候Vert.x的集群是用的是HAzelCast,一个分布式内存数据库,而且是与Vertx-Core绑定在一起,也就是说默认就支持集群, 到了Vert.x2.0的时候,作者发现HazelCast没有想象的那么稳定,于是把集群的逻辑抽成SPI,剥离了核心代码,从而可以让用户自己实现集群的方式,比如(jgroup, zookeeper) 。到了Vert.x3.0的时候它就已经彻底解耦,并且有了至少4种集群实现方式。 上面是Vert.x集群的简单历史,那Vert.x到底是如何实现集群的呢,其实也很简单,Vert.x集群模式一般用在分布式的EventBus,即你可以在两个不同的机器上启动JVM,然后从机器A发消息给B,B也可以做reply给A,从而实现一个简单的类似Actor模型的通信系统。 上面提到了Vert.x集群模式需要依赖Vertx-Hazelcast-Manager,HazelCast可以跨JVM提供分布式内存数据管理,这样的话你可以把机器A的内存数据共享给机器B。说到这里有人会很好奇,Vert.x是不是通过HazelCast来共享EventBus的通信数据呢?其实不是的。

Vert.x的集群模式通信方式还是基于TCP的点对点方式,比如A注册了一个EventBus地址用于消费发给他的消息。

ver().consumer("someAddress", message -> {

Sy("I have received a message: " + me());

});

随后在Machine B上通过EventBus发消息给 someAddress

//Machine B

even("someAddress", "Hi you.");

这时machine A会打印出 I have received a message: Hi you。但是消息不是走 HazelCast 发出去的,而是走TCP协议,那B是怎么知道A的IP地址与端口呢,聪明的你现在肯定知道HazelCast的用处了,不错,A以集群方式启动的时候,发现自己需要注册地址消费消息,便自己先启动一个NetServer 同时生成一个 Map 结构的数据,Key就是 someAddress,而对应的Value就是A的 NetServer 地址与端口,在Vert.x里是一个以 ServerID 为结构的数据,里面就是 IP地址与端口。然后通过 HazelCast 共享给 B机器,那么B机器就有了这个 Map 结构数据,当B要发消息给someAddress 的时候会从集群里获取这个 Key,这样根据Value就知道目标机器的 IP地址与端口了,于是B再通过Vert.x的API NetClient 建立一个TCP连接把消息发出去即可,是不是很简单呢。

这里你会好奇,如何存在机器C也注册了 someAddress 怎么办呢,因为HashMap是一个Key对应一个Value的呀,恩,其实Vert.x的分布式EventBus并不是使用的传统的HashMap来共享集群各个节点的 ServerID,而是使用了 AsyncMultiMap 这个结构,即一个Key对应多个Value,你可以简单的想象成 Map<String, List>就可以了。这样通过实现简单Round-Robin算法,可以实现基本的负载均衡奥,于是你的A与C节点会轮流接受到来自B的请求。

与此同时,如果C节点发生了意外退出了,则他的node信息会在整个 HazelCast集群里丢失,从而触发一个 member leave 消息,相应的节点 B, A只要接受并处理这个信息就可以了,这里A与B都有 C 节点的 ServerID 缓存数据,所以侦听到 member leave消息后直接删除本地缓存就可以了,这样EventBus再获取 List发消息到 someAddress 会自动避开不存在的 C 节点,从而达到快速的failOver。

当然了Vert.x集群还提供了高可用,只需要对Verticle配置 HA 参数即可,还是以上面的C挂了为例子,如果集群配置了HA,则Vert.x会让他在A节点起来。 这里再详细说明一下,这里C节点的Verticle会在A节点起来,是有一个要求的,即A节点需要有C节点对应的Verticle Class文件,我们一般会在不同的机器上部署相同的Vertcile实例从而达到高可用,所以C节点挂了后,Vert.x集群会自动选择 A节点通过Ver方法再部署一个一样的Verticle,此时A节点可能会多增加了一个 Verticle 实例奥。 此外 HA 模式下还可以指定的 group name,什么意思呢。

我们举一个场景可能会更好点,假设A节点负荷很重了,挂了的C根本没有资源在A上起来,我们希望他在另外一台机器上D起来,那么你可以设置 C,D的group name为一样的名字比如 my-group,这样的话虽然 A C D节点都有一样的Verticle,但是group name不一样,C挂了只会在与自己group name一样的机器上起来,即D机器。

上面这种failover是比较简单粗暴的方式,只考虑group name,而现实中我们要考虑资源等情况,所以在我的项目里不会让Vert.x管理高可用,而是交由其他第三方来管理,有机会我们可以深入聊聊。

以上就是Vert.x的EventBus集群原理,下面我们看看他是怎么启动的。

Vertx集群启动方式

有两种方式启动Vertx的集群模式,一种是基于命令行启动你的Verticle,还有一种是基于嵌入式开发的模式,需要在代码里指定集群启动。 我一般都用命令行方式启动Verticle。强烈建议大家在跑集群模式前,先关闭防火墙

vertx run MyVerticle -cluster -cluster-host 192.168.1.30

执行上面命令的时候需要确保你的依赖里有ver这个文件,他是Vert.x默认集群实现的jar包,此外你还要注意你的Vertx_home里要有 clu 文件,此文件是 HazelCast 的集群配置文件,你的集群配置全都在这里。 默认HazelCast使用 UDP 广播的方式来实现集群信息共享,所以只要你的Verticle都在一个网段里可以收到广播包就可以互相共享信息了,当然HazelCast也支持TCP的方式在节点间互相共享信息。

如果要指定TCP方式,则需要修改相关的配置文件,下面是个例子

<join>

//这里默认是UDP广播的方式,我们把他设置false,关掉

<multicast enabled="false">

<multicast-group>224.2.2.3</multicast-group>

<multicast-port>54327</multicast-port>

</multicast>

//这里启用TCP方式

<tcp-ip enabled="true">

//这里比较麻烦,有多个目标节点,就需要写多少地址

<interface>192.168.1.28</interface>

<interface>192.168.1.29</interface>

</tcp-ip>

<aws enabled="false">

<access-key>my-access-key</access-key>

<secret-key>my-secret-key</secret-key>

<!--optional, default is us-east-1 -->

<region>us-west-1</region>

<!--optional, default is ec2.amazonaws.com. If set, region shouldn't be set as it will override this property -->

<host-header>ec2.amazonaws.com</host-header>

<!-- optional, only instances belonging to this group will be discovered, default will try all running instances -->

<security-group-name>hazelcast-sg</security-group-name>

<tag-key>type</tag-key>

<tag-value>hz-nodes</tag-value>

</aws>

</join>

//这里指定所用通信的网卡,你可以通过命令行指定

<interfaces enabled="false">

<interface>192.168.1.*</interface>

</interfaces>

上面的配置文件中,注解的地方是比较重要的。另外命令行启动参数 **-cluster-host 192.168.1.30** 很多人都不太了解是什么意思,这个host其实是你作为EventBus的消息消费端对外暴露的IP地址,

这个地址最终会被记录到 **AsyncMultiMap** 里,此外如果你的机器有多块网卡,那这个地址也指明了选用哪块网卡作为通信介质,所以我还是建议大家设置一下比较好。

以上是通过命令行的方式启动你的Verticle,下面我看看如何通过代码的方式启动Verticle。

//这个Config是HazelCast的Config类

Config config = new Config();

//实例化一个HazelCastCluster

HazelcastClusterManager mgr = new HazelcastClusterManager();

mgr.setConfig(config);

//定义一个Vertx的配置

VertxOptions options = new VertxOptions().setClusterManager(mgr).setClustered(true);

//获取Vertx实例,采用集群的方式,这里必须通过回调Handler的方式获取。

Ver(options, res -> {

Vertx vertx = res.result();

//do your logic

});

上面其实就是一个最简单的通过代码的方式,手动启动Vertx集群。这里重点的是 VertxOptions 这个配置类,比如这里如何指定EventBus的 cluster host呢?也就是命令行里的 -cluster-host 参数。

这个也是在VertxOptions里指明,通过 ver() 方法即可。

同样的,你可以通过 setHAEanble(true) 来启用HA模式, setHAGroup("groupName") 来指明group的名称。

最后补充一下命令行的方式启动HA:

vertx run MyVerticle -cluster -cluster-host 192.168.1.30 -ha -hagroup myGroup

实现自己的Vertx集群

Vertx-Core其实已经将集群的实现剥离出SPI接口,如果大家看过Vertx-core的源码,就会注意到一个CluterManager接口。

public interface ClusterManager {

Void setVertx(Vertx vertx);

/**

* Return an async multi-map for the given name

*/

<K, V> void getAsyncMultiMap(String name, Handler<AsyncResult<AsyncMultiMap<K, V>>> resultHandler);

/**

* Return an async map for the given name

*/

<K, V> void getAsyncMap(String name, Handler<AsyncResult<AsyncMap<K, V>>> resultHandler);

/**

* Return a synchronous map for the given name

*/

<K, V> Map<K, V> getSyncMap(String name);

void getLockWithTimeout(String name, long timeout, Handler<AsyncResult<Lock>> resultHandler);

void getCounter(String name, Handler<AsyncResult<Counter>> resultHandler);

/**

* Return the unique node ID for this node

*/

String getNodeID();

/**

* Return a list of node IDs corresponding to the nodes in the cluster

*

*/

List<String> getNodes();

/**

* Set a listener that will be called when a node joins or leaves the cluster.

*

* @param listener

*/

void nodeListener(NodeListener listener);

/**

* Join the cluster

*/

void join(Handler<AsyncResult<Void>> resultHandler);

/**

* Leave the cluster

*/

void leave(Handler<AsyncResult<Void>> resultHandler);

/**

* Is the cluster manager active?

*

* @return true if active, false otherwise

*/

boolean isActive();

}

上面就是CluterManager的完整接口,只要实现相关的接口,就可以实现Vertx的集群模式。大家可以看到EventBus就是依赖 getAsyncMultiMap 这个接口来做EventBus地址共享的,此外nodeListener接口 是用来感知集群变动的,比如某个的Vertx节点突然离开了(因为网络的原因,或者主机挂了)。

其他的接口其实很好理解,比如 Lock, AsyncMap, Counter 这些都是用于分布式系统中的协调处理。不同的分布式协调工具都有各自的实现方式,比如Zookeeper就是通过树形结构的方式实现数据存储的, 另外zookeeper通过 curator 这个第三方库把 zookeeper 客户端包装的更容易开发了,默认就提供了 Lock Counter等分布式协调语义。

除了官方的默认 HazelCast实现,还有两个第三方的实现 jgroup, apache-ignite,另外 zookeeper 版已经发布SNAPSHOT版到Maven中央仓库,如果不出意外会与 ver 一起发布。

欢迎加入 Vertx中国用户组QQ群:515203212 交流Vertx技术问题

责任编辑: 鲁达

1.内容基于多重复合算法人工智能语言模型创作,旨在以深度学习研究为目的传播信息知识,内容观点与本网站无关,反馈举报请
2.仅供读者参考,本网站未对该内容进行证实,对其原创性、真实性、完整性、及时性不作任何保证;
3.本站属于非营利性站点无毒无广告,请读者放心使用!

“如何退出rtx群”边界阅读