远程过程调用,简单的理解是一个节点请求另一个节点提供的服务
简单理解 RPC就是要像调用本地的函数一样去调远程函数
原理图:
通用接口 用于生产端实现具体业务 与 消费点进行消费
/** * 通用接口 * * @author LionLi */ public interface HelloService { String hello(String mes); }实现通用接口
/** * 接口实现 * * @author LionLi */ public class HelloServiceImpl implements HelloService { private static int count = 0; @Override public String hello(String mes) { System.out.println("收到客户端消息 => " + mes); return "服务端 => 收到消息 [" + mes + "] 第" + (++count) + " 次"; } }生产端启动类 用于启动netty服务器
/** * 生产端启动类 * * @author LionLi */ public class ProviderBootstrap { public static void main(String[] args) { NettyServer.startServer("localhost", 8088); } }对通用接口进行消费调用
/** * 消费端启动类 * * @author LionLi */ public class ClientBootstrap { // 协议头 public static final String providerName = "HelloService#hello#"; public static void main(String[] args) throws Exception{ NettyClient customer = new NettyClient(); // 创建代理对象 HelloService service = (HelloService) customer.getBean(HelloService.class, providerName); while(true) { Thread.sleep(2 * 1000); // 通过代理对象调用服务提供者的方法 String res = service.hello("你好服务端"); System.out.println("结果 => " + res); } } }使用netty构建服务器 用于接收和处理请求
/** * 服务端 * * @author LionLi */ public class NettyServer { public static void startServer(String hostname, int port) { EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new StringDecoder()); pipeline.addLast(new StringEncoder()); pipeline.addLast(new NettyServerHandler()); } } ); ChannelFuture channelFuture = serverBootstrap.bind(hostname, port).sync(); System.out.println("服务端正在监听......"); channelFuture.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }服务端业务处理
/** * 服务端业务处理 * * @author LionLi */ public class NettyServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { System.out.println("msg => " + msg); // 定义一个协议,比如: "HelloService#hello#你好" if(msg.toString().startsWith(ClientBootstrap.providerName)) { // 截取协议后的内容 String result = new HelloServiceImpl().hello(msg.toString().substring(msg.toString().lastIndexOf("#") + 1)); ctx.writeAndFlush(result); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { ctx.close(); } }用于创建和发送请求 使用代理模式 来创建请求类
/** * 客户端 * * @author LionLi */ public class NettyClient { private static ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); private static NettyClientHandler client; private int count = 0; /** * 代理模式,获取一个代理对象 */ public Object getBean(final Class<?> serivceClass, final String providerName) { return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class<?>[]{serivceClass}, (proxy, method, args) -> { System.out.println("进入代理" + (++count) + " 次"); if (client == null) { initClient(); } // 设置要发给服务器端的信息 // providerName 协议头 // args[0] 是客户端传递的参数 client.setParam(providerName + args[0]); return executor.submit(client).get(); }); } /** * 初始化客户端 */ private static void initClient() { client = new NettyClientHandler(); NioEventLoopGroup group = new NioEventLoopGroup(); Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new StringDecoder()); pipeline.addLast(new StringEncoder()); pipeline.addLast(client); } } ); try { bootstrap.connect("localhost", 8088).sync(); } catch (Exception e) { e.printStackTrace(); } } }业务处理
/** * 客户端业务处理 * * @author LionLi */ public class NettyClientHandler extends ChannelInboundHandlerAdapter implements Callable { private ChannelHandlerContext context; private String result; private String param; @Override public void channelActive(ChannelHandlerContext ctx) { System.out.println("初始化连接......"); context = ctx; } @Override public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) { System.out.println("读取数据......"); result = msg.toString(); //唤醒等待的线程 notify(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { ctx.close(); } /** * 被代理对象调用, 发送数据给服务器 */ @Override public synchronized Object call() throws Exception { System.out.println("call 进入"); context.writeAndFlush(param); //等待 channelRead 方法获取到服务器的结果后,唤醒 wait(); System.out.println("call 返回"); return result; } public void setParam(String param) { this.param = param; } }首先启动生产端
启动消费端 查看日志
客户端发送了三次请求 服务端响应了三次
项目已上传到gitee
地址: netty-demo
如果帮到您了,请帮忙点个star