首页 养生问答 疾病百科 养生资讯 女性养生 男性养生

刚学习netty和java,不会用,但现在急需,您有没有用netty编写的服务器...

发布网友 发布时间:2022-04-25 16:24

我来回答

3个回答

懂视网 时间:2022-04-09 05:53

之前介绍了Netty天然的几种解析器,也稍微介绍了一下ByteToMessageDecoder类,我们对Netty的解码器还是有了一定的了解~


今天要介绍的是Netty中一个很重要的解码器,因为相比于其他的普通的解码器,这个解码器用的场景更多,并不是说其他解码器不重要,只是因为我们业务场景所致


在当今比较流行的水平拆分的架构之下,RPC协议很是流行,这样可以使各个项目解耦,使得更加灵活,每个项目之间通过远程调用交互,相互之间定义一个通讯私有协议,然后解析,这样就可以进行数据接口交互


例如我们定义一个这样的协议类:

package com.lyncc.netty.codec.lengthFieldBasedFrame;

public class CustomMsg {
 
 //类型 系统编号 0xAB 表示A系统,0xBC 表示B系统
 private byte type;
 
 //信息标志 0xAB 表示心跳包 0xBC 表示超时包 0xCD 业务信息包
 private byte flag;
 
 //主题信息的长度
 private int length;
 
 //主题信息
 private String body;
 
 public CustomMsg() {
 
 }
 
 public CustomMsg(byte type, byte flag, int length, String body) {
 this.type = type;
 this.flag = flag;
 this.length = length;
 this.body = body;
 }

 public byte getType() {
 return type;
 }

 public void setType(byte type) {
 this.type = type;
 }

 public byte getFlag() {
 return flag;
 }

 public void setFlag(byte flag) {
 this.flag = flag;
 }

 public int getLength() {
 return length;
 }

 public void setLength(int length) {
 this.length = length;
 }

 public String getBody() {
 return body;
 }

 public void setBody(String body) {
 this.body = body;
 }

}
我们规定两个系统通过Netty去发送这样的一个格式的信息,CustomMsg中包含这样的几类信息:

1)type表示发送端的系统类型

2)flag表示发送信息的类型,是业务数据,还是心跳包数据

3)length表示主题body的长度

4)body表示主题信息


有了这样的相互规定,发送端与接收端按照这种格式去编码和解码数据,这样就很容易的进行数据交互,当然如果netty不提供任何的类,我们也能进行编码解码,但是Netty还是提供了一个现有的类,这样可以避免我们重复造车,并且即使我们愿意重复造车,我们造的车也不一定比Netty好,所以我们还是直接使用吧


Netty提供的类叫做LengthFieldBasedFrameDecoder,与其他的解码器不一致的地方是它需要几个参数作为它的构造函数参数:


这几个参数的详细解析可以见如下文档:

http://blog.163.com/linfenliang@126/blog/static/127857195201210821145721/


我们也仔细说明一下这些参数,加入我们需要解析,加入我们需要解析我们刚才定义的CustomMsg,我们需要自定义一个decoder,这个类继承Netty提供的LengthFieldBasedFrameDecoder:

package com.lyncc.netty.codec.lengthFieldBasedFrame;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;

public class CustomDecoder extends LengthFieldBasedFrameDecoder {
 
 //判断传送客户端传送过来的数据是否按照协议传输,头部信息的大小应该是 byte+byte+int = 1+1+4 = 6
 private static final int HEADER_SIZE = 6;
 
 private byte type;
 
 private byte flag;
 
 private int length;
 
 private String body;

 /**
 * 
 * @param maxFrameLength 解码时,处理每个帧数据的最大长度
 * @param lengthFieldOffset 该帧数据中,存放该帧数据的长度的数据的起始位置
 * @param lengthFieldLength 记录该帧数据长度的字段本身的长度
 * @param lengthAdjustment 修改帧数据长度字段中定义的值,可以为负数
 * @param initialBytesToStrip 解析的时候需要跳过的字节数
 * @param failFast 为true,当frame长度超过maxFrameLength时立即报TooLongFrameException异常,为false,读取完整个帧再报异常
 */
 public CustomDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength,
  int lengthAdjustment, int initialBytesToStrip, boolean failFast) {
 super(maxFrameLength, lengthFieldOffset, lengthFieldLength,
  lengthAdjustment, initialBytesToStrip, failFast);
 }
 
 @Override
 protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
 if (in == null) {
  return null;
 }
 if (in.readableBytes() < HEADER_SIZE) {
  throw new Exception("可读信息段比头部信息都小,你在逗我?");
 }
 
 //注意在读的过程中,readIndex的指针也在移动
 type = in.readByte();
 
 flag = in.readByte();
 
 length = in.readInt();
 
 if (in.readableBytes() < length) {
  throw new Exception("body字段你告诉我长度是"+length+",但是真实情况是没有这么多,你又逗我?");
 }
 ByteBuf buf = in.readBytes(length);
 byte[] req = new byte[buf.readableBytes()];
 buf.readBytes(req);
 body = new String(req, "UTF-8");
 
 CustomMsg customMsg = new CustomMsg(type,flag,length,body);
 return customMsg;
 }

}
头部信息的大小我们这边写的是6,原因在代码里面也解释了,byte是一个字节,int是四个字节,那么头部大小就是6个字节,接下来就是要定义构造函数了,构造函数的入参的解释代码里已经标注了,我们真实的入参是:

技术分享

稍微解释一下:

1)LENGTH_FIELD_LENGTH指的就是我们这边CustomMsg中length这个属性的大小,我们这边是int型,所以是4

2)LENGTH_FIELD_OFFSET指的就是我们这边length字段的起始位置,因为前面有type和flag两个属性,且这两个属性都是byte,两个就是2字节,所以偏移量是2

3)LENGTH_ADJUSTMENT指的是length这个属性的值,假如我们的body长度是40,有时候,有些人喜欢将length写成44,因为length本身还占有4个字节,这样就需要调整一下,那么就需要-4,我们这边没有这样做,所以写0就可以了


好了,以下给出完整的代码:

CustomServer.java

package com.lyncc.netty.codec.lengthFieldBasedFrame;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

import java.net.InetSocketAddress;

public class CustomServer {
 
 private static final int MAX_FRAME_LENGTH = 1024 * 1024;
 private static final int LENGTH_FIELD_LENGTH = 4;
 private static final int LENGTH_FIELD_OFFSET = 2;
 private static final int LENGTH_ADJUSTMENT = 0;
 private static final int INITIAL_BYTES_TO_STRIP = 0;

 private int port;
 
 public CustomServer(int port) {
 this.port = port;
 }
 
 public void start(){
 EventLoopGroup bossGroup = new NioEventLoopGroup(1);
 EventLoopGroup workerGroup = new NioEventLoopGroup();
 try {
  ServerBootstrap sbs = new ServerBootstrap().group(bossGroup,workerGroup).channel(NioServerSocketChannel.class).localAddress(new InetSocketAddress(port))
   .childHandler(new ChannelInitializer<SocketChannel>() {
   
   protected void initChannel(SocketChannel ch) throws Exception {
    ch.pipeline().addLast(new CustomDecoder(MAX_FRAME_LENGTH,LENGTH_FIELD_LENGTH,LENGTH_FIELD_OFFSET,LENGTH_ADJUSTMENT,INITIAL_BYTES_TO_STRIP,false));
    ch.pipeline().addLast(new CustomServerHandler());
   };
   
   }).option(ChannelOption.SO_BACKLOG, 128) 
   .childOption(ChannelOption.SO_KEEPALIVE, true);
  // 绑定端口,开始接收进来的连接
  ChannelFuture future = sbs.bind(port).sync(); 
  
  System.out.println("Server start listen at " + port );
  future.channel().closeFuture().sync();
 } catch (Exception e) {
  bossGroup.shutdownGracefully();
  workerGroup.shutdownGracefully();
 }
 }
 
 public static void main(String[] args) throws Exception {
 int port;
 if (args.length > 0) {
  port = Integer.parseInt(args[0]);
 } else {
  port = 8080;
 }
 new CustomServer(port).start();
 }
}
CustomServerHandler.java

package com.lyncc.netty.codec.lengthFieldBasedFrame;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

public class CustomServerHandler extends SimpleChannelInboundHandler<Object> {

 @Override
 protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
 if(msg instanceof CustomMsg) {
  CustomMsg customMsg = (CustomMsg)msg;
  System.out.println("Client->Server:"+ctx.channel().remoteAddress()+" send "+customMsg.getBody());
 }
 
 }

}
CustomClient.java

package com.lyncc.netty.codec.lengthFieldBasedFrame;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

public class CustomClient {
 
 static final String HOST = System.getProperty("host", "127.0.0.1");
 static final int PORT = Integer.parseInt(System.getProperty("port", "8080"));
 static final int SIZE = Integer.parseInt(System.getProperty("size", "256"));

 public static void main(String[] args) throws Exception {

 // Configure the client.
 EventLoopGroup group = new NioEventLoopGroup();
 try {
  Bootstrap b = new Bootstrap();
  b.group(group)
  .channel(NioSocketChannel.class)
  .option(ChannelOption.TCP_NODELAY, true)
  .handler(new ChannelInitializer<SocketChannel>() {
   @Override
   public void initChannel(SocketChannel ch) throws Exception {
   ch.pipeline().addLast(new CustomEncoder());
   ch.pipeline().addLast(new CustomClientHandler());
   }
  });

  ChannelFuture future = b.connect(HOST, PORT).sync();
  future.channel().writeAndFlush("Hello Netty Server ,I am a common client");
  future.channel().closeFuture().sync();
 } finally {
  group.shutdownGracefully();
 }
 }

}
CustomClientHandler.java

package com.lyncc.netty.codec.lengthFieldBasedFrame;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

public class CustomClientHandler extends ChannelInboundHandlerAdapter {
 
 @Override
 public void channelActive(ChannelHandlerContext ctx) throws Exception {
 CustomMsg customMsg = new CustomMsg((byte)0xAB, (byte)0xCD, "Hello,Netty".length(), "Hello,Netty");
 ctx.writeAndFlush(customMsg);
 }

}
最最重要的就是两个译码器:

CustomDecoder.java

package com.lyncc.netty.codec.lengthFieldBasedFrame;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;

public class CustomDecoder extends LengthFieldBasedFrameDecoder {
 
 //判断传送客户端传送过来的数据是否按照协议传输,头部信息的大小应该是 byte+byte+int = 1+1+4 = 6
 private static final int HEADER_SIZE = 6;
 
 private byte type;
 
 private byte flag;
 
 private int length;
 
 private String body;

 /**
 * 
 * @param maxFrameLength 解码时,处理每个帧数据的最大长度
 * @param lengthFieldOffset 该帧数据中,存放该帧数据的长度的数据的起始位置
 * @param lengthFieldLength 记录该帧数据长度的字段本身的长度
 * @param lengthAdjustment 修改帧数据长度字段中定义的值,可以为负数
 * @param initialBytesToStrip 解析的时候需要跳过的字节数
 * @param failFast 为true,当frame长度超过maxFrameLength时立即报TooLongFrameException异常,为false,读取完整个帧再报异常
 */
 public CustomDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength,
  int lengthAdjustment, int initialBytesToStrip, boolean failFast) {
 super(maxFrameLength, lengthFieldOffset, lengthFieldLength,
  lengthAdjustment, initialBytesToStrip, failFast);
 }
 
 @Override
 protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
 if (in == null) {
  return null;
 }
 if (in.readableBytes() < HEADER_SIZE) {
  throw new Exception("可读信息段比头部信息都小,你在逗我?");
 }
 
 //注意在读的过程中,readIndex的指针也在移动
 type = in.readByte();
 
 flag = in.readByte();
 
 length = in.readInt();
 
 if (in.readableBytes() < length) {
  throw new Exception("body字段你告诉我长度是"+length+",但是真实情况是没有这么多,你又逗我?");
 }
 ByteBuf buf = in.readBytes(length);
 byte[] req = new byte[buf.readableBytes()];
 buf.readBytes(req);
 body = new String(req, "UTF-8");
 
 CustomMsg customMsg = new CustomMsg(type,flag,length,body);
 return customMsg;
 }

}
CustomEncoder.java

package com.lyncc.netty.codec.lengthFieldBasedFrame;

import java.nio.charset.Charset;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;

public class CustomEncoder extends MessageToByteEncoder<CustomMsg> {

 @Override
 protected void encode(ChannelHandlerContext ctx, CustomMsg msg, ByteBuf out) throws Exception {
 if(null == msg){
  throw new Exception("msg is null");
 }
 
 String body = msg.getBody();
 byte[] bodyBytes = body.getBytes(Charset.forName("utf-8"));
 out.writeByte(msg.getType());
 out.writeByte(msg.getFlag());
 out.writeInt(bodyBytes.length);
 out.writeBytes(bodyBytes);
 
 }

}
好了,到此为止代码就全部写完了,运行测试一下:

启动服务器端:

技术分享

运行客户端后,再回到服务器端的控制台:

技术分享


一起学Netty(九)之LengthFieldBasedFrameDecoder

标签:

热心网友 时间:2022-04-09 03:01

(一)handler处理篇
首先,是handler,初次接触netty的朋友要注意,handler不是一个单例.即每个channel下都会有自己的一个handler实例.

public class ServerHandler extends SimpleChannelUpstreamHandler {

private static final Logger logger = Logger.getLogger(
ServerHandler.class.getName());
private final ThreadLocal<Boolean> COMMAND_FLAG = new ThreadLocal<Boolean>();
private final ServerChannelGroup serverChannelGroup = ServerChannelGroup.newInstance();

@Override
public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e)
throws Exception {
if (e instanceof ChannelStateEvent) {
logger.log(Level.INFO, "Channel state changed: {0}", e);
}
super.handleUpstream(ctx, e);
}

@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
throws Exception {
System.out.println(this);
String request = (String) e.getMessage();
//如果接受到客户端发送的bye指令,那么就给客户端回复一个bye指令,客户端接受到后,主动关闭连接
//服务器端通过ChannelFutureListener.CLOSE,当它认为客户端已经接受到服务器发送的bye后,也主动关闭连接
if (request.toLowerCase().equals("bye")) {
ChannelFuture future = e.getChannel().write("bye\r\n");
future.addListener(ChannelFutureListener.CLOSE);
} else {
//以下是我初步解析客户端发送过来的数据,然后决定处理方式
RecevieData receivedaData = MessageDecoder.decode(request);
if (null != receivedaData) {
//服务器第5版
if (VersionCode.V5.equals(receivedaData.getVersion())) {
//然后判断命令是否存在
for (String s : CommandCode.COMMANDS) {
if (s.equals(receivedaData.getActionType())) {
COMMAND_FLAG.set(true);
if (s.equals(CommandCode.KEEP_ALIVE)) {
serverChannelGroup.addChannel(e.getChannel());
}
break;
} else {
COMMAND_FLAG.set(false);
}
}
if (COMMAND_FLAG.get()) {
COMMAND_FLAG.set(false);
//将这个命令传递给下一个handler来处理.
//这里的"下一个handler"即为用户自己定义的处理handler
ctx.sendUpstream(e);
} else {
e.getChannel().write(MessageEncoder.encode(receivedaData, StatusCode.NOT_FOUND, StatusCode.NOT_FOUND_TEXT));
}
} else {
//版本错误
e.getChannel().write(MessageEncoder.encode(receivedaData, StatusCode.VERSION_NOT_SUPPORTED, StatusCode.VERSION_NOT_SUPPORTED_TXET));
}
} else {
//如果格式错误,那么直接返回
e.getChannel().write(MessageEncoder.encode(receivedaData, null, null));
}
}

}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
throws Exception {
logger.log(Level.WARNING, "Server side Unexpected exception from downstream.",
e.getCause());
e.getChannel().close();
ListenerManager.getListener(ConnectClosedByPeerListener.class).connectClosedByPeer(e.getCause());
}
}

在上面这个handler中,我使用了ctx.sendUpstream(e);来处理,个人觉得此处为了要实现执行运行时代码,也可以使用接口等方式.但既然netty提供了sendUpstream 的方法,我们用这个岂不是更方便^_^
下面是使用SSL连接的handler

public class ServerSSLHandler extends SimpleChannelUpstreamHandler {

private static final Logger logger = Logger.getLogger(
ServerSSLHandler.class.getName());
private final ThreadLocal<Boolean> COMMAND_FLAG = new ThreadLocal<Boolean>();
private final ServerChannelGroup serverChannelGroup = ServerChannelGroup.newInstance();

@Override
public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e)
throws Exception {
if (e instanceof ChannelStateEvent) {
logger.log(Level.INFO, "Channel state changed: {0}", e);
}
super.handleUpstream(ctx, e);
}

@Override
public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e)
throws Exception {
//ssl握手
SslHandler sslHandler = ctx.getPipeline().get(SslHandler.class);
sslHandler.handshake();
}

@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
throws Exception {
System.out.println(this);
String request = (String) e.getMessage();
//如果接受到客户端发送的bye指令,那么就给客户端回复一个bye指令,客户端接受到后,主动关闭连接
//服务器端通过ChannelFutureListener.CLOSE,当它认为客户端已经接受到服务器发送的bye后,也主动关闭连接
if (request.toLowerCase().equals("bye")) {
ChannelFuture future = e.getChannel().write("bye\r\n");
future.addListener(ChannelFutureListener.CLOSE);
} else {
//以下是我初步解析客户端发送过来的数据,然后决定处理方式
RecevieData receivedaData = MessageDecoder.decode(request);
if (null != receivedaData) {
//服务器第5版
if (VersionCode.V5.equals(receivedaData.getVersion())) {
//然后判断命令是否存在
for (String s : CommandCode.COMMANDS) {
if (s.equals(receivedaData.getActionType())) {
COMMAND_FLAG.set(true);
if (s.equals(CommandCode.KEEP_ALIVE)) {
serverChannelGroup.addChannel(e.getChannel());
}
break;
} else {
COMMAND_FLAG.set(false);
}
}
if (COMMAND_FLAG.get()) {
COMMAND_FLAG.set(false);
//将这个命令传递给下一个handler来处理.
//这里的"下一个handler"即为用户自己定义的处理handler
ctx.sendUpstream(e);
} else {
e.getChannel().write(MessageEncoder.encode(receivedaData, StatusCode.NOT_FOUND, StatusCode.NOT_FOUND_TEXT));
}
} else {
//版本错误
e.getChannel().write(MessageEncoder.encode(receivedaData, StatusCode.VERSION_NOT_SUPPORTED, StatusCode.VERSION_NOT_SUPPORTED_TXET));
}
} else {
//如果格式错误,那么直接返回
e.getChannel().write(MessageEncoder.encode(receivedaData, null, null));
}
}

}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
throws Exception {
logger.log(Level.WARNING, "Server side Unexpected exception from downstream.",
e.getCause());
e.getChannel().close();
ListenerManager.getListener(ConnectClosedByPeerListener.class).connectClosedByPeer(e.getCause());
}
}
当我们有了2个handler后,当然就是要把他们添加到我们的Pipeline中

public class ServerPipelineFactory implements
ChannelPipelineFactory {

public ChannelPipeline getPipeline() {
ChannelPipeline pipeline = pipeline();
ServerConfig config = ServerConfig.getInstance();
try {
if (config.ssl()) {
SSLEngine engine =
SecureSslContextFactory.getServerContext().createSSLEngine();
//说明是服务器端SslContext
engine.setUseClientMode(false);
pipeline.addLast("ssl", new SslHandler(engine));
}

//此Decoder可以自动解析一句以\r\n结束的命令,我为了方便,也用了这个Decoder
//使用这个Decoder,我不用刻意发送命令长度用于解析,只要没有收到\r\n说明数据还
//没有发送完毕.这个Decoder会等到收到\r\n后调用下个handler
pipeline.addLast("framer", new DelimiterBasedFrameDecoder(
8192, Delimiters.lineDelimiter()));
//字串解码,可以自己设置charset
pipeline.addLast("decoder", new StringDecoder());
//字串编码,可以自己设置charset
pipeline.addLast("encoder", new StringEncoder());

if (config.ssl()) {
//如果开启了SSL,那么使用sslhandler
pipeline.addLast("sslhandler", new ServerSSLHandler());
} else {
//如果没有开启SSL,那么使用普通handler
pipeline.addLast("handler", new ServerHandler());
}
//遍历配置文件中的服务器handler,将其添加进Pipeline链中
for (Element e : config.handler()) {
pipeline.addLast(e.attribute(e.getQName("id")).getValue().trim(),
(ChannelHandler) Class.forName(e.attribute(e.getQName("class")).getValue().trim()).newInstance());
}
} catch (DocumentException ex) {
Logger.getLogger(ServerPipelineFactory.class.getName()).log(Level.SEVERE, ex.getMessage(), ex);
} catch (InstantiationException ex) {
Logger.getLogger(ServerPipelineFactory.class.getName()).log(Level.SEVERE, ex.getMessage(), ex);
} catch (IllegalAccessException ex) {
Logger.getLogger(ServerPipelineFactory.class.getName()).log(Level.SEVERE, ex.getMessage(), ex);
} catch (ClassNotFoundException ex) {
Logger.getLogger(ServerPipelineFactory.class.getName()).log(Level.SEVERE, ex.getMessage(), ex);
}
return pipeline;
}
}
server.xml,放到lib下即可,注意其中的handler 以及clienthandler 项,如果你新建了自己的handler,那么需要在此xml中配置一下.

<?xml version="1.0" encoding="UTF-8"?>

<root>
<!-- 配置主机地址 -->
<host>127.0.0.1</host>
<!-- 配置服务端口 -->
<port>8080</port>
<!-- 是否启用ssl,1为启用,0为停用 -->
<ssl>0</ssl>

<!--服务器业务handler -->
<handler id="timeHandler" class="com.chinatenet.nio.server.handler.ServerTimeHandler" />

<!--客户端业务handler -->
<clienthandler id="timeHandler" class="com.chinatenet.nio.client.handler.ClientTimeHandler" />
</root>
到此,一个简单的可扩展handler的服务器雏形就出来了

下面,我们添加一个自定义的服务器处理handler进来

public class ServerTimeHandler extends SimpleChannelUpstreamHandler {

@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
RecevieData receivedaData = MessageDecoder.decode((String) e.getMessage());
if (CommandCode.GET_TIME.equals(receivedaData.getActionType())
|| CommandCode.KEEP_ALIVE.equals(receivedaData.getActionType())) {
if (VersionCode.V5.equals(receivedaData.getVersion())) {
//回复客户端后,即可进行自己的业务.当然.这里可以根据需要,看
//是先回复再处理还是等处理结果出来后,将结果返回客户端
e.getChannel().write(MessageEncoder.encode(receivedaData, StatusCode.OK,
System.currentTimeMillis() / 1000 + ""));
} else {
//版本错误
e.getChannel().write(MessageEncoder.encode(receivedaData, StatusCode.VERSION_NOT_SUPPORTED,
StatusCode.VERSION_NOT_SUPPORTED_TXET));
}
} else {
//如果不是此handler处理的命令,那么流下去
ctx.sendUpstream(e);
}
}

}

最后测试一下

public class Server {

public static void main(String[] args) throws DocumentException {
ServerBootstrap bootstrap = new ServerBootstrap(
new NioServerSocketChannelFactory(
Executors.newCachedThreadPool(),
Executors.newCachedThreadPool()));

bootstrap.setPipelineFactory(new ServerPipelineFactory());

//因为我要用到长连接
bootstrap.setOption("child.tcpNoDelay", true);
bootstrap.setOption("child.keepAlive", true);

ServerConfig config = ServerConfig.getInstance();
bootstrap.bind(new InetSocketAddress(Integer.valueOf(config.port())));
}
}

总结:在整个服务器编码中,刚开始会遇到"远程主机强迫关闭了一个现有的连接。"等这类错误,最后修改成"相互告知对方我要关闭了"再进行关闭就可以了.

热心网友 时间:2022-04-09 04:19

不好意思,没有哦

声明声明:本网页内容为用户发布,旨在传播知识,不代表本网认同其观点,若有侵权等问题请及时与本网联系,我们将在第一时间删除处理。E-MAIL:11247931@qq.com