Netty实现Echo服务器和客户端

使用Netty实现一个Echo服务器和客户端。


Echo 服务器

  • 所有的Netty服务器都需要如下两个部分,
    • 至少一个ChannelHandler – 处理从客户端接收的数据,代表业务逻辑。
    • 引导– 配置服务器的启动代码

ChannelHandler和业务逻辑

  • ChannelInboundHandlerAdapter的每个方法都可以被重写以挂钩到事件生命周期的恰当点上channelRead()在每条消息到达时调用,其他方法包括注册,连接激活失活,读取,异常事件等。具体方法列表:io.netty.channel.ChannelInboundHandlerAdapter
  • 每个Channel都有一个与之关联的ChannelPipeline, 其持有一个ChannelHandler的实例链,默认情况下,ChannelHandler会把对它的方法的调用转发给链中的下一个ChannelHandler,因此,如果exceptionCaught()方法没有被链中的某处实现,那么所接收的异常将会被传递到ChannelPipeline的尾端并被记录。所以,应用程序中至少提供一个实现了exceptionCaught()方法的ChannelHandler
  • 关于ChannelHandler
    • 针对不同类型的事件来调用ChannelHandler
    • 应用程序通过实现或者拓展ChannelHandler来挂钩到事件的生命周期,并且提供自定义的应用程序逻辑。
    • ChannelHandler有助于保持业务逻辑和网络处理的分离。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/**
* Author by darcy
* Date on 17-5-30 下午9:28.
* Description:
* 继承ChannelInboundHandlerAdapter, 用来定义响应入站(InBound)事件的方法
*/
// 标志该Handler可以被多个Handler安全的共享。
@ChannelHandler.Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf in = (ByteBuf) msg;
System.out.println("Server received:" + in.toString(CharsetUtil.UTF_8));
// 接收的消息发给发送者, 但是不flush.
ctx.write(msg);
}

// Notifies the handler that the last call made to channelRead() was the last message in the current batch
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
// 消息flush到远程节点并且关闭该channel。
ctx.writeAndFlush(Unpooled.EMPTY_BUFFER)
.addListener(ChannelFutureListener.CLOSE);
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// 记录异常并且关闭连接
cause.printStackTrace();
ctx.close();
}
}

引导服务器

  • 本例使用的是NIO, 强可拓展性和彻底的异步性,目前使用最为广泛。如果要使用Blocked IO, 可以使用OioServerSocketChannel和OioEventLoopGroup。
  • NioEventLoopGroup 来接收和处理新的连接,channel的类型指定为NioServerSocketChannel;
  • ChannelInitializer,当新的连接到来时, 一个新的子Channel将会被创建, 而ChannelInitializer将会把EchoServerhandler(自定义的业务逻辑处理)添加到该子handler的ChannelPipeline中。EchoServerChannel处理入站消息。
  • 引导过程:
    • 创建 ServerBootstrap 的实例来引导和绑定服务器。
    • 创建 NioEventLoopGroup 进行事件的处理,包括接收新的连接,处理读写
    • 指定服务器绑定的本地地址InetSocketAddress
    • 使用同一个EchoServerHandler实例初始化每一个新的Channel
    • bind()绑定服务器。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public class EchoServer {
private final int port;

public EchoServer(int port) {
this.port = port;
}

public static void main(String[] args) throws InterruptedException {
// 示例代码中的port配置在了pom文件中。
int port = 8000;
new EchoServer(port).start();
}

private void start() throws InterruptedException {
EchoServerHandler handler = new EchoServerHandler();
// 使用NIO传输,指定NioEventLoopGroup来接收和处理新的连接.
EventLoopGroup group = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(group)
// channel的类型指定为NioServerSocketChannel--相当于JDK中的ServerSocket.
.channel(NioServerSocketChannel.class)
// 设置本地地址, 服务器将绑定到这个地址监听新的连接请求.
.localAddress(new InetSocketAddress(port))
// 当新的连接被接受时, 一个新的子Channel将会被创建, 而ChannelInitializer将会把EchoServerhandler
// 添加到该子handler的ChannelPipeline中。
.childHandler(new ChannelInitializer<SocketChannel>() {

@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(handler);

}
});
// 绑定服务器并等待绑定完成。
ChannelFuture future = bootstrap.bind().sync(); // sync()会阻塞当前线程,直到绑定完成.
// 阻塞直到服务器的Channel关闭。
future.channel().closeFuture().sync(); // channel的CloseFuture上调用sync()方法.
} finally {
// 关闭EventLoopGroup,并且释放所有的资源,包括被创建的线程。
group.shutdownGracefully().sync();
}
}
}
> output
Server received:Netty, yoyo.

Echo 客户端

ChannelHandler处理客户端逻辑

  • channelActive 在到服务器的连接建立以后被调用
  • channelRead0 从服务器接收到消息时候调用
  • SimpleChannelInboundHandler vs ChannelInboundHandlerAdapter
    • 关系到业务逻辑如何处理消息以及Netty如何管理资源。
    • client: channelRead0调用完成的时候,已经处理了所传入的消息。同时 SimpleChannelInboundHandler 负责释放执行保存该消息的ByteBuf的内存引用。
    • server:需要将传入消息发送给发送方,但是write操作是异步的,也就是说channelRead()方法可能返回后write操作也没有完成,所以,EchoServerHandler拓展了ChannelInboundHandlerAdapter,它在这个时间点上是不会释放消息的。消息在channelReadCompleted()方法中,当writeAndFlush()方法调用之时被释放。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@ChannelHandler.Sharable
public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> {

/**
* 到服务器的连接已经建立之后将被调用。
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(Unpooled.copiedBuffer("Netty, yoyo.", CharsetUtil.UTF_8));
}

/**
* 从服务器接收到一条消息时被调用。服务器发送的消息可能分块接收。
* @param channelHandlerContext
* @param byteBuf
* @throws Exception
*/
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) throws Exception {
System.out.println("Received:" + byteBuf.toString(CharsetUtil.UTF_8));
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}

引导client

  • 这里同样使用了NIO传输。可以分别在client和服务器上分别使用不同的传输。
  • 引导过程
    • 为初始化客户端,创建了一个Bootstrap实例。
    • 创建了NioEventLoopGroup, 进行事件处理,包括创建新的连接,处理入站和出站数据
    • 为连接服务器创建InetSocketAddress
    • 当连接被建立时,一个EchoClientHandler实例会被安装到该Chnanel的ChannelPipeline中。
    • connect()连接到远程节点。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public class EchoClient {
private final String host;
private final int port;

public EchoClient(String host, int port) {
this.host = host;
this.port = port;
}

public static void main(String[] args) throws InterruptedException {
String host = "localhost";
int port = 8000;
new EchoClient(host, port).start();
}

private void start() throws InterruptedException {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
// 指定EventLoopGroup处理客户端事件,需要适用于NIO的实现.
bootstrap.group(group)
// 适用于NIO传输的Channel类型。
.channel(NioSocketChannel.class)
.remoteAddress(new InetSocketAddress(host, port))
// 创建Channel时候,向该ChannelPipeline中添加一个EchoClientHandler实例,进行client端的数据处理.
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new EchoClientHandler());
}
});
// 连接到远程节点,阻塞,直到连接完成。
ChannelFuture future = bootstrap.connect().sync();
// 阻塞直到Channel关闭
future.channel().closeFuture().sync();
} finally {
// 关闭线程池并释放资源。
group.shutdownGracefully().sync();
}
}
}
> output
Received:Netty, yoyo.