Netty实战IM即时消息系统(9)客户端登录
零,目录
IM系统简介Netty环境配置服务器启动过程客户端启动过程实战:客户端和服务器端双向通信数据传输载体byteBuf简介客户端和服务器端通信协议编解码器实施客户端登录实施客户端和服务器端发送和接收消息pipeline和CHANEL HANDLER构建客户端和服务器端pipeline数据包拆卸理论和解决方案CHANEL HANDL6 (加入和退出,获取成员列表)发送和接收组聊天消息和优化Netty性能心跳和空闲检测摘要扩展1、登录过程。
1.如上图所示,从客户端连接到服务器端后,1 .客户端创建登录请求对象,然后通过编码将请求对象编码为ByteBuf,并写入服务器端。2.服务器端收到ByteBuf后,首先通过解码将ByteBuf解码为登录请求响应,然后3 .执行服务器端验证。配置登录响应对象,仍对其进行编码,然后写回客户端。4 .客户端收到服务器端响应后解码ByteBuf,以确定在收到登录响应后登录是否成功
二、代码框架
/* *
*实施客户端登录
*
* @author outman
*/
Public class Test_10_实施客户端登录{
public static void main(string[]args){
//服务器端启动
te(8000);
//启动客户端
Te('127.0.0.1 '、8000、5);
}
}
/* *
*客户端
*
* @author outman
*/
Class Test_10_client {
/* *
*启动客户端
*
* @param IP
* IP连接
* @param port
*服务器端口
* @param maxRetry
*最大重试次数
*/
Public static void start (string IP、intport、intmaxretry) {
nioeventloopgroup worker group=new nioeventloopgroup();
boot strap boot strap=new boot strap();
Boo(工作人员组)。频道)
.handler(new channelinitializerniosocketchannel(){
@Override
protected void init channel(niosocketchannel ch)throws exception {
//添加客户端处理逻辑
}
});
//服务器端连接
Connect (bootstrap、IP、端口、max retry);
}
/* *
* @desc连接服务器
* @param bootstrap
* @param IP
* @param port
* @param maxRetry
* @param retryIndex
*重试次数
*/
private static void connect(boot strap boot strap、string IP、int port、int max retry、int).retry index) {
Boo (IP、port)。addlistener (future-{
int[]finalRetryIndex;
//初始化重新连接计数
If==0) {
FinalRetryIndex=new int[]{ 0 };
} else {
FinalRetryIndex=retryIndex
}
//检查连接状态
if(){
sy(' client:' new Date()'连接['IP ':' port']成功);
} else if (maxRetry=0) {
sy(' client:' new Date()'连接['IP ':' port']
失败,达到重连最大次数放弃重连");
} else {
// 重连使用退避算法
int delay = 1 << finalRetryIndex[0];
Sy("客户端:" + new Date() + "连接【" + ip + ":" + port + "】失败," + delay + "秒后执行重试");
boo().group().schedule(() -> {
connect(bootstrap, ip, port, maxRetry - 1, finalRetryIndex[0] + 1);
}, delay, TimeUnit.SECONDS);
}
});
}
}
/**
* 服务端
*
* @author outman
*/
class Test_10_server {
/**
* @desc 服务端启动
* @param port
*/
public static void start(int port) {
NioEventLoopGroup bossGroup = new NioEventLoopGroup();
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBoo(bossGroup, workerGroup).channel)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
// 添加服务端处理逻辑
}
});
// 绑定端口
bind(serverBootstrap, port);
}
/**
* @desc 自动绑定递增并启动服务端
* @param serverBootstrap
* @param port
*/
private static void bind(ServerBootstrap serverBootstrap, int port) {
(port).addListener(future -> {
if ()) {
Sy("服务端:" + new Date() + "绑定端口【" + port + "】成功");
} else {
Sy("服务端:" + new Date() + "绑定端口【" + port + "】失败,执行递增绑定");
bind(serverBootstrap, port + 1);
}
});
}
}
/**
* 客户端处理逻辑
*
* @author outman
*/
class Test_10_clientHandler extends ChannelInboundHandlerAdapter {
/**
* 连接成功时触发
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
}
/**
* 有数据可读时触发
*/
@Override
public void channelRead(ChannelHandlerContext ctx , Object msg) throws Exception {
}
}
/**
* 服务端处理逻辑
*
* @author outman
*/
class Test_10_serverHandler extends ChannelInboundHandlerAdapter {
/**
* 连接成功时触发
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
}
/**
* 有数据可读时触发
*/
@Override
public void channelRead(ChannelHandlerContext ctx , Object obj) throws Exception {
}
}
/**
* 数据包抽象类
*
* @author outman
*/
@Data
abstract class Test_10_Packet {
// 协议版本号
private byte version = 1;
// 获取指定标识
public abstract byte getCommand();
// 指令集合
public interface Command {
// 登录指令
public static final byte LOGIN_REQUEST = 1;
// 登陆响应指令
public static final byte LOGIN_RESPONSE = 2;
}
}
/**
* 序列化抽象接口
*
* @author outman
*/
interface Test_10_Serializer {
// 获取序列化算法标识
byte getSerializerAlgorithm();
// 序列化算法标识集合
interface SerializerAlgorithm {
// JSON 序列化算法标识
public static final byte JSONSerializerAlgrothm = 1;
}
// 默认的序列化算法
public Test_10_Serializer DEFAULT = new Test_10_JSONSerializer();
// 序列化
byte[] enSerialize(ByteBuf byteBuf, Test_10_Packet packet);
// 反序列化
<T>T deSerialize(byte[] bs, Class<T> clazz);
}
/**
* 数据包编解码类
*
* @author outman
*/
class Test_10_PacketCodec {
// 魔数
private static final int MAGIC_NUMBER = 0x12345678;
// 单例
public static Test_10_PacketCodec INSTANCE = new Test_10_PacketCodec();
// 注册 序列化类
private Class[] serializerArray = new Class[] { Te };
// 注册抽象数据包类
private Class[] packetArray = new Class[] {
Te,
Te };
// 序列化算法标识 和对应的序列化类映射
private static Map<Byte, Class<? super Test_10_Serializer>> serializerMap;
// 指令标识和对应的数据包抽象类映射
private static Map<Byte, Class<? super Test_10_Packet>> packetMap;
// 初始化 两个映射
private Test_10_PacketCodec() {
serializerMap = new HashMap<>();
Arrays.asList(serializerArray).forEach(clazz -> {
try {
Method method = clazz.getMethod("getSerializerAlgorithm");
byte serializerAlgorthm = (byte) me((Test_10_Serializer());
(serializerAlgorthm, clazz);
} catch (Exception e) {
e.printStackTrace();
}
});
packetMap = new HashMap<>();
Arrays.asList(packetArray).forEach(clazz -> {
try {
Method method = clazz.getMethod("getCommand");
me(true);
byte command = (byte) me((Test_10_Packet());
(command, clazz);
} catch (Exception e) {
e.printStackTrace();
}
});
}
// 编码
public ByteBuf enCode(ByteBuf byteBuf, Test_10_Packet packet) {
// 序列化数据包
byte[] bs = Te(byteBuf, packet);
// 写入魔数
by(MAGIC_NUMBER);
// 写入协议版本号
by());
// 写入指令标识
by());
// 写入序列化算法标识
by());
// 写入数据长度
by);
// 写入数据
bys(bs);
return byteBuf;
}
// 解码
public Test_10_Packet deCode(ByteBuf byteBuf) throws Exception {
// 跳过魔数校验
by(4);
// 跳过版本号校验
by(1);
// 获取指令标识
byte command = by();
// 获取序列化算法标识
byte serializerAlgorthm = by();
// 获取数据长度
int len = by();
// 获取数据
byte[] bs = new byte[len];
bys(bs);
// 获取对应的序列化算法类
Test_10_Serializer serializer = getSerializer(serializerAlgorthm);
// 获取对应的数据包类
Test_10_Packet packet = getPacket(command);
if(serializer != null && packet != null) {
//反序列化数据包
return (bs, ());
}else {
throw new RuntimeException("没有找到对应的序列化实现或数据包实现");
}
}
private static Test_10_Packet getPacket(byte command) throws Exception {
return (Test_10_Packet) (command).newInstance();
}
private static Test_10_Serializer getSerializer(byte serializerAlgorthm) throws Exception {
return (Test_10_Serializer) (serializerAlgorthm).newInstance();
}
}
/**
* 登录请求数据包实体类
*
* @author outman
*/
@Data
class Test_10_LoginRequestPacket extends Test_10_Packet {
private int userId ;
private String userName;
private String password;
@Override
public byte getCommand() {
return Command.LOGIN_REQUEST;
}
}
/**
* 登录响应数据包实体类
*
* @author outman
*/
@Data
class Test_10_LoginResponsePacket extends Test_10_Packet {
private int code;
private String msg;
@Override
public byte getCommand() {
return Command.LOGIN_RESPONSE;
}
/**
* 响应码集合
* */
interface Code{
// 成功的响应码
public static final int SUCCESS= 10000;
// 失败的响应码
public static final int FAIL = 10001;
}
}
/**
* Json序列化实现类
*
* @author outman
*/
class Test_10_JSONSerializer implements Test_10_Serializer {
@Override
public byte getSerializerAlgorithm() {
return SerializerAlgori;
}
@Override
public byte[] enSerialize(ByteBuf byteBuf, Test_10_Packet packet) {
return JSONObject.toJSONBytes(packet);
}
@Override
public <T>T deSerialize(byte[] bs, Class<T> clazz) {
return JSONObject.parseObject(bs, clazz);
}
}
- 这个代码框架中 已经写好了 服务端、 客户端启动连接 。 通信协议 、 数据包编解码的逻辑 , 剩下的客户端、服务端业务处理逻辑 , 我们边学边写, 现在你可以把代码框架粘贴到你的编辑器中
二、 逻辑处理器
- 接下来我们分别实现一下上述四个过程 , 开始之前 , 我们回顾一下客户端与服务端的启动流程 , 客户端启动的时候 , 我们会在引导类BootStrap里配置客户端处理逻辑 , 本小节中我们的客户端业务处理逻辑叫做Test_10_clientHandler
/**
* 客户端处理逻辑
*
* @author outman
*/
class Test_10_clientHandler extends ChannelInboundHandlerAdapter {
/**
* 连接成功时触发
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
}
/**
* 有数据可读时触发
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
}
}
- 我们在客户端启动的时候 , 给客户端引导类配置这个逻辑处理器 , 这样Netty中事件相关的回调就会回调我们的Test_10_clientHandler
boo(workerGroup).channel)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
// 添加 客户端处理逻辑
ch.pipeline().addLast(new Test_10_clientHandler());
}
});
- 同样 我们给服务端引导类页配置一个逻辑处理器
serverBoo(bossGroup, workerGroup).channel)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
// 添加服务端处理逻辑
ch.pipeline().addLast(new Test_10_serverHandler());
}
});
- 接下来我们主要围绕这两个Handler来编写客户端登录相关的处理逻辑
三、 客户端发送登录请求
- 客户端处理登录请求
- 我们实现在客户端连接上服务端之后 , 立即登录。 在客户端和服务端连接成功时 , Netty 会回调Test_10_clientHandler 的channelActive(ChannelHandlerContext ctx) 方法 , 我们在这里写 请求登录的逻辑(我们事先在 Test_10_LoginRequestPacket 中添加了三个属性 , Test_10_LoginRequestPacket 类上的@Data 注解是lombok 提供的 ,让我们不用写setter/getter)
/**
* 连接成功时触发
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
Sy("客户端:"+new Date()+"开始登陆");
// 创建登陆对象
Test_10_LoginRequestPacket loginRequestPacket = new Test_10_LoginRequestPacket();
// 随机取ID 1~999
loginReque((in()*1000)+1);
loginReque("outman");
loginReque("123456");
// 编码
ByteBuf byteBuf = Te().buffer(), loginRequestPacket);
// 写出数据
c().writeAndFlush(byteBuf);
}
- 写数据的时候 , 我们通过c() 获取到当前连接(Netty对连接 的抽象为channel , 后面小节会分析) , 然后调用了writeAndFlush() 方法 就能把二进制数据写到服务端
- 服务端处理登录请求
/**
* 服务端处理逻辑
*
* @author outman
*/
class Test_10_serverHandler extends ChannelInboundHandlerAdapter {
/**
* 连接成功时触发
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
}
/**
* 有数据可读时触发
*/
@Override
public void channelRead(ChannelHandlerContext ctx , Object obj) throws Exception {
ByteBuf byteBuf = (ByteBuf) obj;
// 解码
Test_10_Packet packet = Te(byteBuf);
// 根据指令执行对应的处理逻辑
switch () ) {
case Te:
Test_10_LoginRequestPacket loginRequestPacket = (Test_10_LoginRequestPacket) packet;
// 校验成功
Sy("服务端:"+new Date()+"【"+loginReque()+"】 登陆成功");
break;
default:
Sy("服务端:"+new Date()+"收到未知的指令【"+()+"】");
break;
}
}
}
- 我们在服务端引导类 ServerBootstrap 添加了逻辑处理器Test_10_serverHandler 之后 , Netty 在收到数据之后会回调channelRead() , 这里第二个参数msg , 在我们这个场景中 , 可以直接强转为ByteBuf , 为什么Netty不直接把这个参数类型定义为ByteBuf? , 我们在后面的小节会分析到
- 拿到ByteBuf 之后 , 首先要做的事情就是解码 , 解码出的java数据包对象 , 然后判断如果是登陆请求数据包, 就进行登录逻辑的处理这里我们假设所有的登录请求都是成功的 , 接下来, 我们来告诉客户端他登陆成功的好消息。
四、 服务端发送登录响应
- 服务端发送登录响应
/**
* 服务端处理逻辑
*
* @author outman
*/
class Test_10_serverHandler extends ChannelInboundHandlerAdapter {
/**
* 连接成功时触发
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
}
/**
* 有数据可读时触发
*/
@Override
public void channelRead(ChannelHandlerContext ctx , Object obj) throws Exception {
ByteBuf byteBuf = (ByteBuf) obj;
// 解码
Test_10_Packet packet = Te(byteBuf);
// 根据指令执行对应的处理逻辑
switch () ) {
case Te:
Test_10_LoginRequestPacket loginRequestPacket = (Test_10_LoginRequestPacket) packet;
// 模拟校验成功
Sy("服务端:"+new Date()+"【"+loginReque()+"】 登陆成功");
// 给服务端响应
Test_10_LoginResponsePacket loginResponsePacket = new Test_10_LoginResponsePacket();
loginRe);
loginRe("登陆成功!");
// 编码
byteBuf = Te(byteBuf, loginResponsePacket);
//写出数据
c().writeAndFlush(byteBuf);
break;
default:
Sy("服务端:"+new Date()+"收到未知的指令【"+()+"】");
break;
}
}
}
- 这段逻辑仍然时候服务端逻辑处理器Test_10_serverHandler的channelRead 方法中 , 我们构造一个登录响应包Test_10_LoginResponsePacket , 然后在校验成功和失败时分别设置标志位 , 接下来调用编码器把java对象编码成ByteBuf , 然后调用writeAndFlush 把数据包写给客户端
- 客户端处理登录响应
/**
* 有数据可读时触发
*/
@Override
public void channelRead(ChannelHandlerContext ctx , Object msg) throws Exception {
ByteBuf byteBuf = (ByteBuf) msg;
// 数据包解码
Test_10_Packet packet= Te(byteBuf);
//根据不同的指令选择对应的处理逻辑
switch ()) {
case Te:
Test_10_LoginResponsePacket loginResponsePacket = (Test_10_LoginResponsePacket) packet;
Sy("客户端:"+new Date() +"收到服务端响应【"+loginRe()+"】");
break;
default:
break;
}
}
- 客户端拿到数据之后 , 调用Test_10_PacketCodec 进行解码操作 , 然后我们打印出服务端的响应内容
- 执行结果
- 完整代码
/**
* 实现客户端登录
*
* @author outman
*/
public class Test_10_实现客户端登录 {
public static void main(String[] args) {
// 启动服务端
Te(8000);
// 启动客户端
Te("127.0.0.1", 8000, 5);
}
}
/**
* 客户端
*
* @author outman
*/
class Test_10_client {
/**
* 客户端启动
*
* @param ip
* 连接ip
* @param port
* 服务端端口
* @param maxRetry
* 最大重试次数
*/
public static void start(String ip, int port, int maxRetry) {
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
boo(workerGroup).channel)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
// 添加 客户端处理逻辑
ch.pipeline().addLast(new Test_10_clientHandler());
}
});
// 连接服务端
connect(bootstrap, ip, port, maxRetry);
}
/**
* @desc 连接服务端
* @param bootstrap
* @param ip
* @param port
* @param maxRetry
* @param retryIndex
* 重试计数
*/
private static void connect(Bootstrap bootstrap, String ip, int port, int maxRetry, int... retryIndex) {
boo(ip, port).addListener(future -> {
int[] finalRetryIndex;
// 初始化 重连计数
if == 0) {
finalRetryIndex = new int[] { 0 };
} else {
finalRetryIndex = retryIndex;
}
// 判断连接状态
if ()) {
Sy("客户端:" + new Date() + "连接【" + ip + ":" + port + "】成功");
} else if (maxRetry <= 0) {
Sy("客户端:" + new Date() + "连接【" + ip + ":" + port + "】失败,达到重连最大次数放弃重连");
} else {
// 重连使用退避算法
int delay = 1 << finalRetryIndex[0];
Sy("客户端:" + new Date() + "连接【" + ip + ":" + port + "】失败," + delay + "秒后执行重试");
boo().group().schedule(() -> {
connect(bootstrap, ip, port, maxRetry - 1, finalRetryIndex[0] + 1);
}, delay, TimeUnit.SECONDS);
}
});
}
}
/**
* 服务端
*
* @author outman
*/
class Test_10_server {
/**
* @desc 服务端启动
* @param port
*/
public static void start(int port) {
NioEventLoopGroup bossGroup = new NioEventLoopGroup();
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBoo(bossGroup, workerGroup).channel)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
// 添加服务端处理逻辑
ch.pipeline().addLast(new Test_10_serverHandler());
}
});
// 绑定端口
bind(serverBootstrap, port);
}
/**
* @desc 自动绑定递增并启动服务端
* @param serverBootstrap
* @param port
*/
private static void bind(ServerBootstrap serverBootstrap, int port) {
(port).addListener(future -> {
if ()) {
Sy("服务端:" + new Date() + "绑定端口【" + port + "】成功");
} else {
Sy("服务端:" + new Date() + "绑定端口【" + port + "】失败,执行递增绑定");
bind(serverBootstrap, port + 1);
}
});
}
}
/**
* 客户端处理逻辑
*
* @author outman
*/
class Test_10_clientHandler extends ChannelInboundHandlerAdapter {
/**
* 连接成功时触发
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
Sy("客户端:"+new Date()+"开始登陆");
// 创建登陆对象
Test_10_LoginRequestPacket loginRequestPacket = new Test_10_LoginRequestPacket();
// 随机取ID 1~999
loginReque((in()*1000)+1);
loginReque("outman");
loginReque("123456");
// 编码
ByteBuf byteBuf = Te().buffer(), loginRequestPacket);
// 写出数据
c().writeAndFlush(byteBuf);
}
/**
* 有数据可读时触发
*/
@Override
public void channelRead(ChannelHandlerContext ctx , Object msg) throws Exception {
ByteBuf byteBuf = (ByteBuf) msg;
// 数据包解码
Test_10_Packet packet= Te(byteBuf);
//根据不同的指令选择对应的处理逻辑
switch ()) {
case Te:
Test_10_LoginResponsePacket loginResponsePacket = (Test_10_LoginResponsePacket) packet;
Sy("客户端:"+new Date() +"收到服务端响应【"+loginRe()+"】");
break;
default:
break;
}
}
}
/**
* 服务端处理逻辑
*
* @author outman
*/
class Test_10_serverHandler extends ChannelInboundHandlerAdapter {
/**
* 连接成功时触发
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
}
/**
* 有数据可读时触发
*/
@Override
public void channelRead(ChannelHandlerContext ctx , Object obj) throws Exception {
ByteBuf byteBuf = (ByteBuf) obj;
// 解码
Test_10_Packet packet = Te(byteBuf);
// 根据指令执行对应的处理逻辑
switch () ) {
case Te:
Test_10_LoginRequestPacket loginRequestPacket = (Test_10_LoginRequestPacket) packet;
// 模拟校验成功
Sy("服务端:"+new Date()+"【"+loginReque()+"】 登陆成功");
// 给服务端响应
Test_10_LoginResponsePacket loginResponsePacket = new Test_10_LoginResponsePacket();
loginRe);
loginRe("登陆成功!");
// 编码
byteBuf = Te(byteBuf, loginResponsePacket);
//写出数据
c().writeAndFlush(byteBuf);
break;
default:
Sy("服务端:"+new Date()+"收到未知的指令【"+()+"】");
break;
}
}
}
/**
* 数据包抽象类
*
* @author outman
*/
@Data
abstract class Test_10_Packet {
// 协议版本号
private byte version = 1;
// 获取指定标识
public abstract byte getCommand();
// 指令集合
public interface Command {
// 登录指令
public static final byte LOGIN_REQUEST = 1;
// 登陆响应指令
public static final byte LOGIN_RESPONSE = 2;
}
}
/**
* 序列化抽象接口
*
* @author outman
*/
interface Test_10_Serializer {
// 获取序列化算法标识
byte getSerializerAlgorithm();
// 序列化算法标识集合
interface SerializerAlgorithm {
// JSON 序列化算法标识
public static final byte JSONSerializerAlgrothm = 1;
}
// 默认的序列化算法
public Test_10_Serializer DEFAULT = new Test_10_JSONSerializer();
// 序列化
byte[] enSerialize(ByteBuf byteBuf, Test_10_Packet packet);
// 反序列化
<T>T deSerialize(byte[] bs, Class<T> clazz);
}
/**
* 数据包编解码类
*
* @author outman
*/
class Test_10_PacketCodec {
// 魔数
private static final int MAGIC_NUMBER = 0x12345678;
// 单例
public static Test_10_PacketCodec INSTANCE = new Test_10_PacketCodec();
// 注册 序列化类
private Class[] serializerArray = new Class[] { Te };
// 注册抽象数据包类
private Class[] packetArray = new Class[] {
Te,
Te };
// 序列化算法标识 和对应的序列化类映射
private static Map<Byte, Class<? super Test_10_Serializer>> serializerMap;
// 指令标识和对应的数据包抽象类映射
private static Map<Byte, Class<? super Test_10_Packet>> packetMap;
// 初始化 两个映射
private Test_10_PacketCodec() {
serializerMap = new HashMap<>();
Arrays.asList(serializerArray).forEach(clazz -> {
try {
Method method = clazz.getMethod("getSerializerAlgorithm");
byte serializerAlgorthm = (byte) me((Test_10_Serializer());
(serializerAlgorthm, clazz);
} catch (Exception e) {
e.printStackTrace();
}
});
packetMap = new HashMap<>();
Arrays.asList(packetArray).forEach(clazz -> {
try {
Method method = clazz.getMethod("getCommand");
me(true);
byte command = (byte) me((Test_10_Packet());
(command, clazz);
} catch (Exception e) {
e.printStackTrace();
}
});
}
// 编码
public ByteBuf enCode(ByteBuf byteBuf, Test_10_Packet packet) {
// 序列化数据包
byte[] bs = Te(byteBuf, packet);
// 写入魔数
by(MAGIC_NUMBER);
// 写入协议版本号
by());
// 写入指令标识
by());
// 写入序列化算法标识
by());
// 写入数据长度
by);
// 写入数据
bys(bs);
return byteBuf;
}
// 解码
public Test_10_Packet deCode(ByteBuf byteBuf) throws Exception {
// 跳过魔数校验
by(4);
// 跳过版本号校验
by(1);
// 获取指令标识
byte command = by();
// 获取序列化算法标识
byte serializerAlgorthm = by();
// 获取数据长度
int len = by();
// 获取数据
byte[] bs = new byte[len];
bys(bs);
// 获取对应的序列化算法类
Test_10_Serializer serializer = getSerializer(serializerAlgorthm);
// 获取对应的数据包类
Test_10_Packet packet = getPacket(command);
if(serializer != null && packet != null) {
//反序列化数据包
return (bs, ());
}else {
throw new RuntimeException("没有找到对应的序列化实现或数据包实现");
}
}
private static Test_10_Packet getPacket(byte command) throws Exception {
return (Test_10_Packet) (command).newInstance();
}
private static Test_10_Serializer getSerializer(byte serializerAlgorthm) throws Exception {
return (Test_10_Serializer) (serializerAlgorthm).newInstance();
}
}
/**
* 登录请求数据包实体类
*
* @author outman
*/
@Data
class Test_10_LoginRequestPacket extends Test_10_Packet {
private int userId ;
private String userName;
private String password;
@Override
public byte getCommand() {
return Command.LOGIN_REQUEST;
}
}
/**
* 登录响应数据包实体类
*
* @author outman
*/
@Data
class Test_10_LoginResponsePacket extends Test_10_Packet {
private int code;
private String msg;
@Override
public byte getCommand() {
return Command.LOGIN_RESPONSE;
}
/**
* 响应码集合
* */
interface Code{
// 成功的响应码
public static final int SUCCESS= 10000;
// 失败的响应码
public static final int FAIL = 10001;
}
}
/**
* Json序列化实现类
*
* @author outman
*/
class Test_10_JSONSerializer implements Test_10_Serializer {
@Override
public byte getSerializerAlgorithm() {
return SerializerAlgori;
}
@Override
public byte[] enSerialize(ByteBuf byteBuf, Test_10_Packet packet) {
return JSONObject.toJSONBytes(packet);
}
@Override
public <T>T deSerialize(byte[] bs, Class<T> clazz) {
return JSONObject.parseObject(bs, clazz);
}
}
五、 总结
- 本小节我们梳理了客户端登录的基本流程 , 然后结合上一小节的编解码逻辑 , 我们使用Netty 完成了完整的客户端登录流程。
六、 思考
- 客户端登录成功或失败之后 , 如何把成功或者失败的标识绑定在客户端的连接上 ? 服务端又是怎样有效的避免客户端重新登录的?
- 答: 给channel设置attr自定义属性 , 可以把登录标识绑定在连接上
- 客户端NioEventLoopGroup不用释放吗?
- 答: 不用 , 程序关闭之后 , 所有的线程都自动关闭了
后记
- 感谢各位客官的阅读 , 如果文章中有错误或剖析的不够透彻还请您能够给不吝赐教在评论中告诉小编 , 以便小编能够及时调整文章内容 , 为大家带来更加优质的文章
- 如果想阅读更多有趣的文章,结交更多有趣的灵魂可以关注公众号