Netty自定义编码器MessageToMessageEncoder类
在Netty的实践中很多场景都会用到自定义报文,那么MessageToMessageEncoder编码器,几乎比如会遇到。
从类名层面来说,第二个Message可以理解为任意一个对象。如果是使用ByteBuf对象的话,就和MessageToByte的原理一样。
使用时,要在MessageToMessageDecoder\<ByteBuf>的解码器里面对ByteBuf类型的对象进行解码操作,解析为所需的格式或报文。
本文以字符串的编码和解析为实例(关注公众号:程序新视界,回复“1006”,接收完整实例和更多代码)来进行讲解,通过自定义编码器和解码器,便可以在Netty中便直接发送和接收String类型的数据。
解码器的本质就是将原始字符数据与自定义的消息对象进行转换。网络中数据的传输都是以字节码形式的,client编码发送到server,server进行解码,反之一样。因此,编码器和解码器往往是成对出现的。
了解了上述基本原理,下面来看看具体实现代码。
首先看编码器部分StringEncoder:
/**
* 自定义字符串编码器
* 来源:公众号,程序新视界
* @author sec
* @version 1.0
* @date 2020/12/22
**/
public class StringEncoder extends MessageToMessageEncoder<CharSequence> {
private final Charset charset;
public StringEncoder() {
this(Charset.defaultCharset());
}
public StringEncoder(Charset charset) {
if (charset == null) {
throw new NullPointerException("charset");
}
this.charset = charset;
}
@Override
protected void encode(ChannelHandlerContext ctx, CharSequence msg, List<Object> out) {
// 发送消息为空直接返回
if (msg.length() == 0) {
return;
}
out.add(ByteBufUtil.encodeString(ctx.alloc(), CharBuffer.wrap(msg),
this.charset));
}
}
对应的解码器StringDecoder:
/**
* 字符串解码器
* 来源:公众号,程序新视界
* @author sec
* @version 1.0
* @date 2020/12/22
**/
public class StringDecoder extends MessageToMessageDecoder<ByteBuf> {
private final Charset charset;
public StringDecoder() {
this(Charset.defaultCharset());
}
public StringDecoder(Charset charset) {
if (charset == null) {
throw new NullPointerException("charset");
}
this.charset = charset;
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf msg,
List<Object> out) {
out.add(msg.toString(this.charset));
}
}
其中,编码器和解码器都需要在client和server中进行配置指定,先看server端代码:
/**
* 服务器端
*
* 来源:公众号,程序新视界
* @author sec
* @version 1.0
* @date 2020/12/22
**/
public class Server {
public static void main(String[] args) throws Exception {
int port = 9998;
new Server().bind(port);
}
public void bind(int port) throws Exception {
// 服务器线程组 用于网络事件的处理 一个用于服务器接收客户端的连接
// 另一个线程组用于处理SocketChannel的网络读写
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
// NIO服务器端的辅助启动类 降低服务器开发难度
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
// 类似NIO中serverSocketChannel
.channel(NioServerSocketChannel.class)
// 配置TCP参数
.option(ChannelOption.SO_BACKLOG, 1024)
// 设置tcp缓冲区
.option(ChannelOption.SO_BACKLOG, 1024)
// 设置发送缓冲大小
.option(ChannelOption.SO_SNDBUF, 32 * 1024)
// 这是接收缓冲大小
.option(ChannelOption.SO_RCVBUF, 32 * 1024)
// 保持连接
.option(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
// 网络事件处理器
@Override
protected void initChannel(SocketChannel channel) {
// 增加自定义的编码器和解码器
channel.pipeline()
.addLast(new StringEncoder())
.addLast(new StringDecoder())
// 服务端的处理器
.addLast(new ServerHandler());
}
});
// 服务器启动后 绑定监听端口 同步等待成功 主要用于异步操作的通知回调 回调处理用的ChildChannelHandler
ChannelFuture f = serverBootstrap.bind(port).sync();
System.out.println("Server启动");
// 等待服务端监听端口关闭
f.channel().closeFuture().sync();
} finally {
// 优雅退出 释放线程池资源
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
System.out.println("服务器优雅的释放了线程资源…");
}
}
}
然后对应Handler:
/**
* 服务器端处理器
*
* 来源:公众号,程序新视界
* @author sec
* @version 1.0
* @date 2020/12/22
**/
public class ServerHandler extends ChannelHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// 接受客户端的数据
String body = (String) msg;
System.out.println(body.length());
System.out.println("收到Client信息:[" + body +"]");
// 服务端,回写数据给客户端,直接回写整形的数据
String data = "Hello ,I am Server …";
ctx.writeAndFlush(data);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
ctx.close();
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
}
}
client端对应请求:
/**
* 客户端
*
* 来源:公众号,程序新视界
* @author sec
* @version 1.0
* @date 2020/12/22
**/
public class Client {
public static void main(String[] args) throws Exception {
new Client().connect(9998, "127.0.0.1");
}
/**
* 连接服务器
*/
public void connect(int port, String host) throws Exception {
// 配置客户端NIO线程组
EventLoopGroup group = new NioEventLoopGroup();
try {
// 客户端辅助启动类 对客户端配置
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
// 网络事件处理器
@Override
protected void initChannel(SocketChannel channel) {
// 增加自定义的编码器和解码器
channel.pipeline()
.addLast(new StringEncoder())
.addLast(new StringDecoder())
// 客户端的处理器
.addLast(new ClientHandler());
}
});
// 异步链接服务器 同步等待链接成功
ChannelFuture f = b.connect(host, port).sync();
System.out.println(f);
// 发送消息
Thread.sleep(1000);
f.channel().writeAndFlush("Hello ");
f.channel().writeAndFlush("World ");
Thread.sleep(2000);
f.channel().writeAndFlush("Netty ");
// 等待链接关闭
f.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
System.out.println("客户端优雅的释放了线程资源...");
}
}
}
client对应的handler:
/**
* 客户端处理器
*
* 来源:公众号,程序新视界
* @author sec
* @version 1.0
* @date 2020/12/22
**/
public class ClientHandler extends ChannelHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
try {
String body = (String) msg;
System.out.println("Client :" + body);
// 只是读数据,没有写数据的话
// 需要自己手动的释放的消息
} finally {
ReferenceCountUtil.release(msg);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
ctx.close();
}
}
然后依次运行Server和Client即可进行通信。

关注公众号:程序新视界,一个让你软实力、硬技术同步提升的平台
除非注明,否则均为程序新视界原创文章,转载必须以链接形式标明本文链接
本文链接:http://choupangxia.cn/2020/12/23/netty-messagetomessageencoder/