Netty需要的運行環境很簡單,只有2個。
JDK 1.8+Apache Maven 3.3.9+二、Netty 客戶端/服務器概覽如圖,展示了一個我們將要編寫的 Echo 客戶端和服務器應用程序。該圖展示是多個客戶端同時連接到一臺服務器。所能夠支持的客戶端數量,在理論上,僅受限于系統的可用資源(以及所使用的 JDK 版本可能會施加的限制)。
(資料圖)
Echo 客戶端和服務器之間的交互是非常簡單的;在客戶端建立一個連接之后,它會向服務器發送一個或多個消息,反過來服務器又會將每個消息回送給客戶端。雖然它本身看起來好像用處不大,但它充分地體現了客戶端/服務器系統中典型的請求-響應交互模式。
三、編寫 Echo 服務器所有的 Netty 服務器都需要以下兩部分。
至少一個 ChannelHandler—該組件實現了服務器對從客戶端接收的數據的處理,即它的業務邏輯。引導—這是配置服務器的啟動代碼。至少,它會將服務器綁定到它要監聽連接請求的端口上。3.1 ChannelHandler 和業務邏輯上一篇博文我們介紹了 Future 和回調,并且闡述了它們在事件驅動設計中的應用。我們還討論了 ChannelHandler,它是一個接口族的父接口,它的實現負責接收并響應事件通知。
在 Netty 應用程序中,所有的數據處理邏輯都包含在這些核心抽象的實現中。因為你的 Echo 服務器會響應傳入的消息,所以它需要實現ChannelInboundHandler 接口,用來定義響應入站事件的方法。簡單的應用程序只需要用到少量的這些方法,所以繼承 ChannelInboundHandlerAdapter 類也就足夠了,它提供了ChannelInboundHandler 的默認實現。
我們將要用到的方法是:
channelRead() :對于每個傳入的消息都要調用;channelReadComplete() : 通知ChannelInboundHandler最后一次對channelRead()的調用是當前批量讀取中的最后一條消息;exceptionCaught() :在讀取操作期間,有異常拋出時會調用。該 Echo 服務器的 ChannelHandler 實現是 EchoServerHandler,如代碼:
package com.example.netty;import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import io.netty.channel.ChannelFutureListener;import io.netty.channel.ChannelHandler;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;import io.netty.util.CharsetUtil;/** * @author lhd * @date 2023/05/16 15:05 * @notes Netty Echo服務端簡單邏輯 *///表示channel可以并多個實例共享,它是線程安全的@ChannelHandler.Sharablepublic class EchoServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf in = (ByteBuf) msg; //將消息打印到控制臺 System.out.println("Server received: " + in.toString(CharsetUtil.UTF_8)); //將收到的消息寫給發送者,而不沖刷出站消息 ctx.write(in); } @Override public void channelReadComplete(ChannelHandlerContext ctx) { //將未決消息沖刷到遠程節點,并且關閉該 Channe ctx.writeAndFlush(Unpooled.EMPTY_BUFFER) .addListener(ChannelFutureListener.CLOSE); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { //打印異常堆棧跟蹤 cause.printStackTrace(); //關閉該channel ctx.close(); }}
ChannelInboundHandlerAdapter 有一個直觀的 API,并且它的每個方法都可以被重寫以掛鉤到事件生命周期的恰當點上。
因為需要處理所有接收到的數據,所以我們重寫了 channelRead()方法。在這個服務器應用程序中,我們將數據簡單地回送給了遠程節點。
重寫 exceptionCaught()方法允許我們對 Throwable 的任何子類型做出反應,在這里你記錄了異常并關閉了連接。
雖然一個更加完善的應用程序也許會嘗試從異常中恢復,但在這個場景下,只是通過簡單地關閉連接來通知遠程節點發生了錯誤。
ps:如果不捕獲異常,會發生什么呢?
每個 Channel 都擁有一個與之相關聯的 ChannelPipeline,其持有一個 ChannelHandler 的實例鏈。在默認的情況下,ChannelHandler 會把對它的方法的調用轉發給鏈中的下一個 ChannelHandler。因此,如果 exceptionCaught()方法沒有被該鏈中的某處實現,那么所接收的異常將會被傳遞到 ChannelPipeline 的尾端并被記錄。為此,你的應用程序應該提供至少有一個實現exceptionCaught()方法的 ChannelHandler。
除了 ChannelInboundHandlerAdapter 之外,還有很多需要學習ChannelHandler的子類型和實現。這些之后會一一說明,目前,我們只關注:
針對不同類型的事件來調用 ChannelHandler;應用程序通過實現或者擴展 ChannelHandler 來掛鉤到事件的生命周期,并且提供自定義的應用程序邏輯;在架構上,ChannelHandler 有助于保持業務邏輯與網絡處理代碼的分離。這簡化了開發過程,因為代碼必須不斷地演化以響應不斷變化的需求。3.2 引導服務器下面我們準備開始構建服務器。構建服務器涉及到兩個內容:
綁定到服務器將在其上監聽并接受傳入連接請求的端口;配置 Channel,以將有關的入站消息通知給 EchoServerHandler 實例。package com.example.netty;import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioServerSocketChannel;import java.net.InetSocketAddress;/** * @author lhd * @date 2023/05/16 15:21 * @notes Netty引導服務器 */public class EchoServer { public static void main(String[] args) throws Exception { //調用服務器的 start()方法 new EchoServer().start(); } public void start() throws Exception { final EchoServerHandler serverHandler = new EchoServerHandler(); //創建EventLoopGroup EventLoopGroup group = new NioEventLoopGroup(); try { //創建ServerBootstra ServerBootstrap b = new ServerBootstrap(); //指定服務器監視端口 int port = 8080; b.group(group) //指定所使用的 NIO 傳輸 Channel //因為我們正在使用的是 NIO 傳輸,所以你指定了 NioEventLoopGroup 來接受和處理新的連接, // 并且將 Channel 的類型指定為 NioServerSocketChannel 。 .channel(NioServerSocketChannel.class) //使用指定的端口設置套接字地址 //將本地地址設置為一個具有選定端口的 InetSocketAddress 。服務器將綁定到這個地址以監聽新的連接請求 .localAddress(new InetSocketAddress(port)) //添加一個EchoServerHandler 到子Channel的 ChannelPipeline //這里使用了一個特殊的類——ChannelInitializer。這是關鍵。 // 當一個新的連接被接受時,一個新的子 Channel 將會被創建,而 ChannelInitializer 將會把一個你的 //EchoServerHandler 的實例添加到該 Channel 的 ChannelPipeline 中。正如我們之前所解釋的, // 這個 ChannelHandler 將會收到有關入站消息的通知。 .childHandler(new ChannelInitializer(){ @Override public void initChannel(SocketChannel ch) throws Exception { //EchoServerHandler 被標注為 @Shareable,所以我們可以總是使用同樣的實例 //實際上所有客戶端都是使用的同一個EchoServerHandler ch.pipeline().addLast(serverHandler); } }); //異步地綁定服務器,調用 sync()方法阻塞等待直到綁定完成 //sync()方法的調用將導致當前 Thread阻塞,一直到綁定操作完成為止 ChannelFuture f = b.bind().sync(); //獲取 Channel 的CloseFuture,并且阻塞當前線 //該應用程序將會阻塞等待直到服務器的 Channel關閉(因為你在 Channel 的 CloseFuture 上調用了 sync()方法) f.channel().closeFuture().sync(); } finally { //關閉 EventLoopGroup,釋放所有的資源,包括所有被創建的線程 group.shutdownGracefully().sync(); } }}
我們總結一下服務器實現中的重要步驟。下面這些是服務器的主要代碼組件:
EchoServerHandler 實現了業務邏輯;main()方法引導了服務器;引導過程中所需要的步驟如下:創建一個 ServerBootstrap 的實例以引導和綁定服務器;創建并分配一個 NioEventLoopGroup 實例以進行事件的處理,如接受新連接以及讀/寫數據;指定服務器綁定的本地的 InetSocketAddress;使用一個 EchoServerHandler 的實例初始化每一個新的 Channel;調用 ServerBootstrap.bind()方法以綁定服務器。到此我們的引導服務器已經完成。
四、編寫 Echo 客戶端Echo 客戶端將會:(1)連接到服務器;(2)發送一個或者多個消息;(3)對于每個消息,等待并接收從服務器發回的相同的消息;(4)關閉連接。編寫客戶端所涉及的兩個主要代碼部分也是業務邏輯和引導,和你在服務器中看到的一樣。
4.1 通過 ChannelHandler 實現客戶端邏輯如同服務器,客戶端將擁有一個用來處理數據的 ChannelInboundHandler。在這個場景下,我們將擴展 SimpleChannelInboundHandler 類以處理所有必須的任務。這要求重寫下面的方法:
channelActive() : 在到服務器的連接已經建立之后將被調用;channelRead0() : 當從服務器接收到一條消息時被調用;exceptionCaught() :在處理過程中引發異常時被調用。具體代碼可以參考如下:
package com.example.netty;import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import io.netty.channel.ChannelHandler;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.SimpleChannelInboundHandler;import io.netty.util.CharsetUtil;/** * @author lhd * @date 2023/05/16 15:45 * @notes Netty 簡單的客戶端邏輯 *///標記該類的實例可以被多個 Channel 共享@ChannelHandler.Sharablepublic class EchoClientHandler extends SimpleChannelInboundHandler { //當被通知 Channel是活躍的時候,發送一條消息 @Override public void channelActive(ChannelHandlerContext ctx) { ctx.writeAndFlush(Unpooled.copiedBuffer("Netty rocks!", CharsetUtil.UTF_8)); } //記錄已接收消息的轉儲 @Override public void channelRead0(ChannelHandlerContext ctx, ByteBuf in) { System.out.println("Client received: " + in.toString(CharsetUtil.UTF_8)); } //在發生異常時,記錄錯誤并關閉Channel @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); }}
首先,我們重寫了 channelActive()方法,其將在一個連接建立時被調用。這確保了數據將會被盡可能快地寫入服務器,其在這個場景下是一個編碼了字符串"Netty rocks!"的字節緩沖區。
接下來,我們重寫了 channelRead0()方法。每當接收數據時,都會調用這個方法。由服務器發送的消息可能會被分塊接收。也就是說,如果服務器發送了 5 字節,那么不能保證這 5 字節會被一次性接收。即使是對于這么少量的數據,channelRead0()方法也可能會被調用兩次,第一次使用一個持有 3 字節的 ByteBuf(Netty 的字節容器),第二次使用一個持有 2 字節的 ByteBuf。作為一個面向流的協議,TCP 保證了字節數組將會按照服務器發送它們的順序被接收。
ps:所以channelRead0()的調用次數不一定等于服務器發布消息的次數
重寫的第三個方法是 exceptionCaught()。如同在 EchoServerHandler(3.1中的代碼示例)中所示,記錄 Throwable,關閉 Channel,在這個場景下,終止到服務器的連接。
ps:為什么客戶端繼承SimpleChannelInboundHandler 而不是ChannelInboundHandler?
在客戶端,當 channelRead0()方法完成時,我們已經有了傳入消息,并且已經處理完它了。當該方法返回時,SimpleChannelInboundHandler 負責釋放指向保存該消息的 ByteBuf 的內存引用。
在 EchoServerHandler 中,我們仍然需要將傳入消息回送給發送者,而 write()操作是異步的,直到 channelRead()方法返回后可能仍然沒有完成。為此,EchoServerHandler擴展了 ChannelInboundHandlerAdapter,其在這個時間點上不會釋放消息。消息在 EchoServerHandler 的 channelReadComplete()方法中,當 writeAndFlush()方法被調用時被釋放。
4.2 引導客戶端引導客戶端類似于引導服務器,不同的是,客戶端是使用主機和端口參數來連接遠程地址,也就是這里的 Echo 服務器的地址,而不是綁定到一個一直被監聽的端口。
package com.example.netty;import io.netty.bootstrap.Bootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioSocketChannel;import java.net.InetSocketAddress;/** * @author lhd * @date 2023/05/16 15:59 * @notes 引導客戶端 */public class EchoClient { public void start() throws Exception { //指定 EventLoopGroup 以處理客戶端事件;需要適用于 NIO 的實現 EventLoopGroup group = new NioEventLoopGroup(); try { //創建 Bootstrap Bootstrap b = new Bootstrap(); b.group(group) //適用于 NIO 傳輸的 Channel 類型 .channel(NioSocketChannel.class) //設置服務器的InetSocketAddress .remoteAddress(new InetSocketAddress("127.0.0.1", 8080)) .handler(new ChannelInitializer() { @Override public void initChannel(SocketChannel ch) throws Exception { //在創建Channel時,向 ChannelPipeline中添加一個 EchoClientHandler 實例 ch.pipeline().addLast(new EchoClientHandler());} }); //連接到遠程節點,阻塞等待直到連接完成 ChannelFuture f = b.connect().sync(); //阻塞,直到Channel 關閉 f.channel().closeFuture().sync(); } finally { //關閉線程池并且釋放所有的資源 group.shutdownGracefully().sync(); } } public static void main(String[] args) throws Exception { new EchoClient().start(); }}
總結一下要點:
為初始化客戶端,創建了一個 Bootstrap 實例;為進行事件處理分配了一個 NioEventLoopGroup 實例,其中事件處理包括創建新的連接以及處理入站和出站數據;為服務器連接創建了一個 InetSocketAddress 實例;當連接被建立時,一個 EchoClientHandler 實例會被安裝到(該 Channel 的)ChannelPipeline 中;在一切都設置完成后,調用 Bootstrap.connect()方法連接到遠程節點;綜上客戶端的構建已經完成。
五、構建和運行 Echo 服務器和客戶端將我們上面的代碼復制到IDEA中運行,先啟動服務端在啟動客戶端會得到以下預期效果:
服務端控制臺打印:客戶端控制臺打印:我們關閉服務端后,客戶端控制臺打印:因為服務端關閉,觸發了客戶端 EchoClientHandler 中的exceptionCaught()方法,打印出了異常堆棧并關閉了連接。
這只是一個簡單的應用程序,但是它可以伸縮到支持數千個并發連接——每秒可以比普通的基于套接字的 Java 應用程序處理多得多的消息。
推薦閱讀
關于我們| 聯系方式| 版權聲明| 供稿服務| 友情鏈接
咕嚕網 www.fyuntv.cn 版權所有,未經書面授權禁止使用
Copyright©2008-2023 By All Rights Reserved 皖ICP備2022009963號-10
聯系我們: 39 60 29 14 2@qq.com