Netty实战IM即时消息系统(6)实战:客户端和服务器端双向通信
零,目录
IM系统简介Netty环境配置服务器启动过程实践:客户端和服务器端双向通信数据传输载体byteBuf简介客户端和服务器端通信协议编解码器实施客户端登录实施客户端和服务器端发送和接收消息pipeline和CHANEL HANDLER构建客户端和服务器端pipeline数据包拆解理论和解决方案chann ElHandler 获取成员列表)发送和接收组聊天消息和优化Netty性能扩展心跳和空闲检测摘要6、实战:客户端和服务器双向通信。
本节想要实现的功能是:在客户端连接成功后向服务器写入数据,从服务器接收数据后打印,以及对客户端的数据段做出响应。
我们先做一个代码框架 , 然后在框架上面做修改public class Test_07_客户端和服务端双向通信 {
public static void main(String[] args) {
Te(8000);
Te("127.0.0.1" , 8000 ,5);
}
}
class Test_07_Client{
public static void start(String IP , int port ,int maxRetry){
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
boo(workerGroup)
.channel)
.handler(new ChannelInitializer<NioSocketchannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
}
});
connect(bootstrap , IP , port , maxRetry);
}
private static void connect(Bootstrap bootstrap, String IP, int port, int maxRetry , int... retryIndex) {
boo(IP , port).addListener(future ->{
int[] finalRetryIndex;
i()) {
Sy("连接成功");
}else if(maxRetry ==0) {
Sy("达到最大重试此时,放弃重试");
}else {
// 初始化 重试计数
i == 0) {
finalRetryIndex = new int[]{0};
}else {
finalRetryIndex = retryIndex;
}
// 计算时间间隔
int delay = 1 << finalRetryIndex[0];
// 执行重试
Sy(new Date() +" 连接失败,剩余重试次数:"+ maxRetry + ","+delay+"秒后执行重试");
boo().group().schedule(()->{
connect(bootstrap , IP, port , maxRetry -1 , finalRetryIndex[0]+1);
}, delay, TimeUnit.SECONDS);
}
});
}
}
class Test_07_Server{
public static void start(int port){
NioEventLoopGroup bossGroup = new NioEventLoopGroup();
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBoo(bossGroup, workerGroup).channel)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new Test_07_ServerHandler());
}
});
bind(serverBootstrap, port);
}
private static void bind(ServerBootstrap serverBootstrap, int port) {
serverBootstrap
.bind(port)
.addListener(future -> {
i()) {
Sy("服务端:端口【"+port+"】绑定成功!");
}else {
Sy("服务端:端口【"+port+"】绑定失败,尝试绑定【"+(port+1)+"】!");
bind(serverBootstrap, port+1);
}
});
}
}
- 客户端发送数据到服务端
- 在《客户端启动流程》这一小节 , 我们提到 客户端相关的数据读写逻辑是通过BootStrap的handler()方法指定
boo(workerGroup).channel)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
}
});
- 现在我们在initChannel()中给客户端添加一个逻辑处理器 , 这个处理器的作用就是负责向服务端写数据
boo(workerGroup).channel)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
// 添加业务处理逻辑 可以添加自定义的业务处理逻辑也可以添加 Netty自带的简单通用的处理逻辑
ch.pipeline().addLast(new Test_07_ClientHandler());
}
});
- ch.pipeline()方法返回的是和这条连接相关的逻辑处理链 , 采用了责任链处理模式 , 这里不理解没关系 , 后面会讲到。
- 然后再调用addLast()方法添加一个逻辑处理器 , 这个逻辑处理器为的就是在客户端建立连接成功之后向服务端写数据 , 下面是这个逻辑处理器的代码:
class Test_07_ClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
Sy(new Date() + " 客户端写出数据...");
// 1. 获取数据
ByteBuf buffer = getByteBuf(ctx);
// 2. 写数据
c().writeAndFlush(buffer);
}
private ByteBuf getByteBuf(ChannelHandlerContext ctx) {
// 获取二进制抽象 ByteBuffer
ByteBuf buf = c().buffer();
// 准备数据
byte[] bs = "你好,奥特曼!".getByte("UTF-8"));
// 把数据填充到 buf
buf.writeBytes(bs);
return buf;
}
}
- 这个逻辑处理器继承自ChannelInboundHandlerAdapter ,然后覆盖了channelActive()方法 , 这个方法会在客户端连接建立成功之后被调用
- 客户端连接建立成功之后 , 调用channelActive() , 在这个方法里面 , 我们编写向服务端写数据的逻辑
- 向服务端写数据分为两步 , 首先我们要获取一个netty对二进制数据抽象的二进制ByteBuf , 上面代码中c() 获取一个ByteBuf的内存管理器 , 这个内存管理器的作用就是分配一个ByteBuf , 然后我们把字符串的二进制数据填充到ByteBuf , 这样我们就获取到了Netty需要的一个数据格式, 最后我们调用c().writeAndFlush()把数据写到服务端。
- 以上就是 向服务端写数据的逻辑 , 和传统的socket 编程不同的是 , Netty 里面的数据是以ByteBuf为单位的 , 所有需要写出的数据必须塞到一个ByteBuf里 , 需要读取的数据也是如此。
- 服务端读取客户端数据
- 服务端的数据处理逻辑 是通过ServerBootStrap 的childHandler()方法指定
(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
// TODO Auto-generated method stub
}
})
- 现在 , 我们在initChannel() 中 给服务端添加一个逻辑处理器 , 这个处理器 的作用就是负责客户端读数据
(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new Test_07_ServerHandler());
}
})
- 这个方法里的逻辑和客户端类似 , 获取服务端关于这条连接的逻辑处理链pipeline , 然后添加一个逻辑处理器 , 负责读取客户端发来的数据
class Test_07_ServerHandler extends ChannelInboundHandlerAdapter{
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
Sy(new Date() + ": 服务端读到数据->"+ buf.toString("UTF-8")));
}
}
- 服务端的逻辑处理器同样是继承自ChannelInboundHandlerAdapter , 与客户端不同的是 , 这里覆盖的方法是 channelRead() ,这个方法在接收到数据之后会被回调
- 这里的msg 值的是Netty里面数据读写的载体 , 为什么不直接是ByteBuf , 而需要我们强转一下 , 我们后面会分析道 , 这里我们强转之后 , 然后调用bu() 就能够拿到我们客户端发过来的字符串数据。
- 运行测试
- 完整代码
import java.nio.c;
import java.u;
import java.u;
import io.ne;
import io.ne;
import io.ne;
import io.ne;
import io.ne;
import io.ne;
import io.ne;
import io.ne;
import io.ne;
public class Test_07_客户端和服务端双向通信 {
public static void main(String[] args) throws Exception {
Te(8000);
Te("127.0.0.1", 8000, 5);
}
}
class Test_07_Client {
public static void start(String IP, int port, int maxRetry) {
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
boo(workerGroup).channel)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
// 添加业务处理逻辑 可以添加自定义的业务处理逻辑也可以添加 Netty自带的简单通用的处理逻辑
ch.pipeline().addLast(new Test_07_ClientHandler());
}
});
connect(bootstrap, IP, port, maxRetry);
}
private static void connect(Bootstrap bootstrap, String IP, int port, int maxRetry, int... retryIndex) {
boo(IP, port).addListener(future -> {
int[] finalRetryIndex;
if ()) {
Sy("客户端连接【"+IP+":"+port+"】成功");
} else if (maxRetry == 0) {
Sy("达到最大重试此时,放弃重试");
} else {
// 初始化 重试计数
if == 0) {
finalRetryIndex = new int[] { 0 };
} else {
finalRetryIndex = retryIndex;
}
// 计算时间间隔
int delay = 1 << finalRetryIndex[0];
// 执行重试
Sy(new Date() + " 连接失败,剩余重试次数:" + maxRetry + "," + delay + "秒后执行重试");
boo().group().schedule(() -> {
connect(bootstrap, IP, port, maxRetry - 1, finalRetryIndex[0] + 1);
}, delay, TimeUnit.SECONDS);
}
});
}
}
class Test_07_Server {
public static void start(int port) {
NioEventLoopGroup bossGroup = new NioEventLoopGroup();
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBoo(bossGroup, workerGroup).channel)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new Test_07_ServerHandler());
}
});
bind(serverBootstrap, port);
}
private static void bind(ServerBootstrap serverBootstrap, int port) {
serverBootstrap
.bind(port)
.addListener(future -> {
i()) {
Sy("服务端:端口【"+port+"】绑定成功!");
}else {
Sy("服务端:端口【"+port+"】绑定失败,尝试绑定【"+(port+1)+"】!");
bind(serverBootstrap, port+1);
}
});
}
}
class Test_07_ClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
String content = "你好,奥特曼!";
Sy(new Date() + " 客户端写出数据:"+content);
// 1. 获取数据
ByteBuf buffer = getByteBuf(ctx , content);
// 2. 写数据
c().writeAndFlush(buffer);
}
private ByteBuf getByteBuf(ChannelHandlerContext ctx , String content ) {
// 获取二进制抽象 ByteBuffer
ByteBuf buf = c().buffer();
// 准备数据
byte[] bs = content.getByte("UTF-8"));
// 把数据填充到 buf
buf.writeBytes(bs);
return buf;
}
}
class Test_07_ServerHandler extends ChannelInboundHandlerAdapter{
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
Sy(new Date() + ": 服务端读到数据->"+ buf.toString("UTF-8")));
}
}
- 运行结果:
- 服务端回复数据给客户端
- 服务端向客户端写数据的逻辑与客户端向服务端写数据的逻辑一样 , 先创建一个ByteBuf , 然后填充二进制数据 , 最后调用writeAndFlush()方法写出去
class Test_07_ServerHandler extends ChannelInboundHandlerAdapter{
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
Sy(new Date() + ": 服务端读到数据->"+ buf.toString("UTF-8")));
// 向客户端回复数据
String content = "你好,田先森!";
Sy(new Date() +":服务端写出数据-> "+content);
ByteBuf byteBuf = getByteBuf(ctx , content);
c().writeAndFlush(byteBuf);
}
private static ByteBuf getByteBuf(ChannelHandlerContext cxt , String content) {
// 获取 二进制抽象 ByteBuf
ByteBuf byteBuf = cxt.alloc().buffer();
// 准备数据
byte[] bs = content.getByte("UTF-8"));
// 把数据填充到buf中
by(bs);
return byteBuf;
}
}
- 现在轮到客户端了 , 客户端读取数据的逻辑和服务端读数据的逻辑一样 , 同样是覆盖channelRead() 方法
class Test_07_ClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf byteBuf = (ByteBuf) msg;
Sy(new Date()+": 客户端读到数据 ->"+ by("UTF-8")));
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
String content = "你好,奥特曼!";
Sy(new Date() + " 客户端写出数据:"+content);
// 1. 获取数据
ByteBuf buffer = getByteBuf(ctx , content);
// 2. 写数据
c().writeAndFlush(buffer);
}
private ByteBuf getByteBuf(ChannelHandlerContext ctx , String content ) {
// 获取二进制抽象 ByteBuffer
ByteBuf buf = c().buffer();
// 准备数据
byte[] bs = content.getByte("UTF-8"));
// 把数据填充到 buf
buf.writeBytes(bs);
return buf;
}
}
- 现在 客户端和服务端就实现了双向通信
- 完整代码:
import java.nio.c;
import java.u;
import java.u;
import io.ne;
import io.ne;
import io.ne;
import io.ne;
import io.ne;
import io.ne;
import io.ne;
import io.ne;
import io.ne;
public class Test_07_客户端和服务端双向通信 {
public static void main(String[] args) throws Exception {
Te(8000);
Te("127.0.0.1", 8000, 5);
}
}
class Test_07_Client {
public static void start(String IP, int port, int maxRetry) {
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
boo(workerGroup).channel)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
// 添加业务处理逻辑 可以添加自定义的业务处理逻辑也可以添加 Netty自带的简单通用的处理逻辑
ch.pipeline().addLast(new Test_07_ClientHandler());
}
});
connect(bootstrap, IP, port, maxRetry);
}
private static void connect(Bootstrap bootstrap, String IP, int port, int maxRetry, int... retryIndex) {
boo(IP, port).addListener(future -> {
int[] finalRetryIndex;
if ()) {
Sy("客户端连接【"+IP+":"+port+"】成功");
} else if (maxRetry == 0) {
Sy("达到最大重试此时,放弃重试");
} else {
// 初始化 重试计数
if == 0) {
finalRetryIndex = new int[] { 0 };
} else {
finalRetryIndex = retryIndex;
}
// 计算时间间隔
int delay = 1 << finalRetryIndex[0];
// 执行重试
Sy(new Date() + " 连接失败,剩余重试次数:" + maxRetry + "," + delay + "秒后执行重试");
boo().group().schedule(() -> {
connect(bootstrap, IP, port, maxRetry - 1, finalRetryIndex[0] + 1);
}, delay, TimeUnit.SECONDS);
}
});
}
}
class Test_07_Server {
public static void start(int port) {
NioEventLoopGroup bossGroup = new NioEventLoopGroup();
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBoo(bossGroup, workerGroup).channel)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new Test_07_ServerHandler());
}
});
bind(serverBootstrap, port);
}
private static void bind(ServerBootstrap serverBootstrap, int port) {
serverBootstrap
.bind(port)
.addListener(future -> {
i()) {
Sy("服务端:端口【"+port+"】绑定成功!");
}else {
Sy("服务端:端口【"+port+"】绑定失败,尝试绑定【"+(port+1)+"】!");
bind(serverBootstrap, port+1);
}
});
}
}
class Test_07_ClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf byteBuf = (ByteBuf) msg;
Sy(new Date()+": 客户端读到数据 ->"+ by("UTF-8")));
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
String content = "你好,奥特曼!";
Sy(new Date() + " 客户端写出数据:"+content);
// 1. 获取数据
ByteBuf buffer = getByteBuf(ctx , content);
// 2. 写数据
c().writeAndFlush(buffer);
}
private ByteBuf getByteBuf(ChannelHandlerContext ctx , String content ) {
// 获取二进制抽象 ByteBuffer
ByteBuf buf = c().buffer();
// 准备数据
byte[] bs = content.getByte("UTF-8"));
// 把数据填充到 buf
buf.writeBytes(bs);
return buf;
}
}
class Test_07_ServerHandler extends ChannelInboundHandlerAdapter{
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
Sy(new Date() + ": 服务端读到数据->"+ buf.toString("UTF-8")));
// 向客户端回复数据
String content = "你好,田先森!";
Sy(new Date() +":服务端写出数据-> "+content);
ByteBuf byteBuf = getByteBuf(ctx , content);
c().writeAndFlush(byteBuf);
}
private static ByteBuf getByteBuf(ChannelHandlerContext cxt , String content) {
// 获取 二进制抽象 ByteBuf
ByteBuf byteBuf = cxt.alloc().buffer();
// 准备数据
byte[] bs = content.getByte("UTF-8"));
// 把数据填充到buf中
by(bs);
return byteBuf;
}
}
- 执行结果
- 总结
- 本小节中 , 我们了解到客户端和服务端的逻辑处理均是在启动的时候 , 通过给逻辑处理链pipeline添加逻辑处理器 , 来编写数据的处理逻辑 , pipeline的逻辑我们会在后面分析
- 接下来我们学到了 在客户端连接成功之后会回调逻辑处理器的channelActive()方法 , 而不管是服务端还是客户端 , 收到数据之后都会调用channelRead方法
- 写数据用writeAndFlush() 方法 客户端与服务端交互的二进制数据载体为ByteBuf , ByteBuf 通过连接的内存管理器创建 , 字节数据填充到ByteBuf 之后才能写到对端 , 接下来一小节 , 我们就重点来分析ByteBuf
- 思考: 如何实现在新连接介入的时候 , 服务端主动向客户端推送消息 , 客户端回复服务端消息?
- 解答: 在服务器端的逻辑处理其中也实现 channelActive() 在有新的连接接入时 会回调此方法
class Test_07_ServerHandler extends ChannelInboundHandlerAdapter{
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
String content = "是不是你连我了?";
Sy(new Date() +":服务端写出数据-> "+content);
ByteBuf byteBuf = getByteBuf(ctx , content);
c().writeAndFlush(byteBuf);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
Sy(new Date() + ": 服务端读到数据->"+ buf.toString("UTF-8")));
// 向客户端回复数据
String content = "你好,田先森!";
Sy(new Date() +":服务端写出数据-> "+content);
ByteBuf byteBuf = getByteBuf(ctx , content);
c().writeAndFlush(byteBuf);
}
private static ByteBuf getByteBuf(ChannelHandlerContext cxt , String content) {
// 获取 二进制抽象 ByteBuf
ByteBuf byteBuf = cxt.alloc().buffer();
// 准备数据
byte[] bs = content.getByte("UTF-8"));
// 把数据填充到buf中
by(bs);
return byteBuf;
}
}
后记
感谢各位客官的阅读 , 如果文章中有错误或剖析的不够透彻还请您能够给不吝赐教在评论中告诉小编 , 以便小编能够及时调整文章内容 , 为大家带来更加优质的文章