从零学Netty(十一)Netty实现RPC

it2024-12-14  13

简介

远程过程调用,简单的理解是一个节点请求另一个节点提供的服务

简单理解 RPC就是要像调用本地的函数一样去调远程函数

原理图:

实例demo

通用

通用接口  用于生产端实现具体业务  与  消费点进行消费

/** * 通用接口 * * @author LionLi */ public interface HelloService { String hello(String mes); }

生产端

实现通用接口

/** * 接口实现 * * @author LionLi */ public class HelloServiceImpl implements HelloService { private static int count = 0; @Override public String hello(String mes) { System.out.println("收到客户端消息 => " + mes); return "服务端 => 收到消息 [" + mes + "] 第" + (++count) + " 次"; } }

生产端启动类  用于启动netty服务器

/** * 生产端启动类 * * @author LionLi */ public class ProviderBootstrap { public static void main(String[] args) { NettyServer.startServer("localhost", 8088); } }

消费端

对通用接口进行消费调用

/** * 消费端启动类 * * @author LionLi */ public class ClientBootstrap { // 协议头 public static final String providerName = "HelloService#hello#"; public static void main(String[] args) throws Exception{ NettyClient customer = new NettyClient(); // 创建代理对象 HelloService service = (HelloService) customer.getBean(HelloService.class, providerName); while(true) { Thread.sleep(2 * 1000); // 通过代理对象调用服务提供者的方法 String res = service.hello("你好服务端"); System.out.println("结果 => " + res); } } }

服务端

使用netty构建服务器  用于接收和处理请求

/** * 服务端 * * @author LionLi */ public class NettyServer { public static void startServer(String hostname, int port) { EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new StringDecoder()); pipeline.addLast(new StringEncoder()); pipeline.addLast(new NettyServerHandler()); } } ); ChannelFuture channelFuture = serverBootstrap.bind(hostname, port).sync(); System.out.println("服务端正在监听......"); channelFuture.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }

服务端业务处理

/** * 服务端业务处理 * * @author LionLi */ public class NettyServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { System.out.println("msg => " + msg); // 定义一个协议,比如: "HelloService#hello#你好" if(msg.toString().startsWith(ClientBootstrap.providerName)) { // 截取协议后的内容 String result = new HelloServiceImpl().hello(msg.toString().substring(msg.toString().lastIndexOf("#") + 1)); ctx.writeAndFlush(result); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { ctx.close(); } }

客户端

用于创建和发送请求  使用代理模式 来创建请求类

/** * 客户端 * * @author LionLi */ public class NettyClient { private static ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); private static NettyClientHandler client; private int count = 0; /** * 代理模式,获取一个代理对象 */ public Object getBean(final Class<?> serivceClass, final String providerName) { return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class<?>[]{serivceClass}, (proxy, method, args) -> { System.out.println("进入代理" + (++count) + " 次"); if (client == null) { initClient(); } // 设置要发给服务器端的信息 // providerName 协议头 // args[0] 是客户端传递的参数 client.setParam(providerName + args[0]); return executor.submit(client).get(); }); } /** * 初始化客户端 */ private static void initClient() { client = new NettyClientHandler(); NioEventLoopGroup group = new NioEventLoopGroup(); Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new StringDecoder()); pipeline.addLast(new StringEncoder()); pipeline.addLast(client); } } ); try { bootstrap.connect("localhost", 8088).sync(); } catch (Exception e) { e.printStackTrace(); } } }

业务处理

/** * 客户端业务处理 * * @author LionLi */ public class NettyClientHandler extends ChannelInboundHandlerAdapter implements Callable { private ChannelHandlerContext context; private String result; private String param; @Override public void channelActive(ChannelHandlerContext ctx) { System.out.println("初始化连接......"); context = ctx; } @Override public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) { System.out.println("读取数据......"); result = msg.toString(); //唤醒等待的线程 notify(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { ctx.close(); } /** * 被代理对象调用, 发送数据给服务器 */ @Override public synchronized Object call() throws Exception { System.out.println("call 进入"); context.writeAndFlush(param); //等待 channelRead 方法获取到服务器的结果后,唤醒 wait(); System.out.println("call 返回"); return result; } public void setParam(String param) { this.param = param; } }

测试

首先启动生产端

启动消费端 查看日志

客户端发送了三次请求  服务端响应了三次

 

项目已上传到gitee

地址: netty-demo

如果帮到您了,请帮忙点个star

最新回复(0)