rpc接口并发调用实例

it2023-12-31  64

问题背景 需要根据id通过rpc调用查询具体信息,因为没有提供批量查询的接口,所以做法是挨个遍历查询,那意味着: 如果有100个id,就需要顺序进行100次rpc调用,假设每次rpc接口的调用时间是50ms(这个速度很快了),那单单rpc调用就要占用5s,所以接口的响应会非常慢。下面进行优化。

优化方案: 方案一:让服务方提供批量查询接口,需要服务提供方配合,这里暂不采用。 方案二:rpc服务的调用由顺序调用修改为并行调用,采用线程池实现rpc的并发调用。

具体实现如下: 1)创建线程的类 public class MyThreadFactory implements ThreadFactory {     @Override     public Thread newThread(Runnable r) {         return new Thread(r);     } }

2)创建线程处理类,因为需要获取rpc调用的结果所以是实现callable类 说明:这里需要获取spring提供的bean,获取方式是通过参数传入 public class RegisterTask implements Callable {

    private String  registerId;     private String  cityCode;     // 这里需要获取spring提供的bean,获取方式是通过参数传入     private RegisterClient registerClient;     public RegisterTask(String registerId, RegisterClient registerClient, String  cityCode){         this.registerId = registerId;         this.cityCode = cityCode;         this.registerClient = registerClient;     }

    @Override     public List<StudentLessonDto> call() throws Exception {         // 这里就是进行rpc调用         return registerClient.queryLessonsByRegistId(registerId, cityCode);     } }

3)方法getDetail()内部创建线程池         ThreadFactory namedThreadFactory = new MyThreadFactory();         int queueCapacity = 10000, corePoolSize = 10, maximumPoolSize = 10;         ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(queueCapacity);         ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, 10, TimeUnit.SECONDS, arrayBlockingQueue, namedThreadFactory, new ThreadPoolExecutor.CallerRunsPolicy());

4) 方法getDetail()内部线程池提交线程,并获取执行的结果         // 线程池进行rpc调用         ArrayList<Future<List<StudentLessonDto>>> futures = new ArrayList<>();         for (String id : registerIdList) {             Future future = threadPoolExecutor.submit(new RegisterTask(id, registerClient, cityCode));             futures.add(future);         }

        // 获取线程调用的结果         for (Future future : futures) {             try {                 List<StudentLessonDto> list = (List<StudentLessonDto>) future.get();                 if (!CollectionUtils.isEmpty(list)) {                     result.put(list.get(0).getRegistId(), list);                 }             } catch (InterruptedException e) {                 log.error("并发获取报名的讲次异常", e);             } catch (ExecutionException e) {                 log.error("并发获取报名的讲次异常", e);             }         }

之后经过测试和上线。

结果: 第一天没啥问题,第二天开始有问题,主要现象如下: 1) 服务调用异常,异常信息大致如下: org.springframework.web.util.NestedServletException: Handler dispatch failed; nested exception is java.lang.OutOfMemoryError: unable to create native thread: possibly out of memory or process/resource limits reached     at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1055)     at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:943)     at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1006)     at org.springframework.web.servlet.FrameworkServlet.doGet(FrameworkServlet.java:898)     at javax.servlet.http.HttpServlet.service(HttpServlet.java:503)     at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:883)     at javax.servlet.http.HttpServlet.service(HttpServlet.java:590)     at io.undertow.servlet.handlers.ServletHandler.handleRequest(ServletHandler.java:74)     at io.undertow.servlet.handlers.FilterHandler$FilterChainImpl.doFilter(     ....... Caused by: java.lang.OutOfMemoryError: unable to create native thread: possibly out of memory or process/resource limits reached     at java.base/java.lang.Thread.start0(Native Method)

2)机器无法登陆,异常如下: root:fork failed: Cannot allocate memory

内存耗用的一干二净,root都没有内存了。最后重启机器才行。

下面定位问题: 最近新上的代码,肯定是我的新增的多线程部分除了内存,耗费了大量内存,但是怎么解释呢?? 说明肯定是线程资源没有得到释放,但是我是在方法内部创建的线程池,方法执行后按道理对应的线程池和线程资源应该会释放的(我个人错误的理解)。

但是看到一篇博客(https://blog.csdn.net/lbh199466/article/details/104934207/), 因为核心线程一直没有释放,所以对应的线程池和线程资源并没有释放。

栈中的引用对象就是一种GcRoots, 所以如果核心线程一直不被回收,那么对应的线程与对象资源都不会被回收。线程栈和线程中的对象占用的对象都不会释放。引用关系:ThreadPoolExecutor->Worker->thread,然后因为thread一直不释放,所以对应的worker和池资源也都会不会释放。 所以: 方法内部定义线程池,核心线程数不为零,核心线程不会被回收,导致相关内存资源都不会被释放。 也可以参考:https://zhuanlan.zhihu.com/p/72515308

查看java进程中的线程的个数 验证上面的结论,参考https://blog.csdn.net/blueheart20/article/details/78905267(获取当前进程数的方法) 获取当前服务器上java进程的线程个数。下面挺一种实例 jps 获取java进程pid top -Hp pid 获取进程中的线程个数 如 Threds:1190 total,代表进程中有1190个线程,可以的确看到当前线程数很多

最终解决方案: 1)参考 https://www.cnblogs.com/qxynotebook/p/7398882.html 在线程池中,有核心线程,对于核心线程超时也回收,所以,需要确保核心线程超时之后也被回收。 解决办法:在结果返回之前设置核心线程也回收: threadPoolExecutor.allowCoreThreadTimeOut(true); 2)参考:https://blog.csdn.net/wchgogo/article/details/78185643(unable to create new native thread) 栈对内存的消耗:目前没有像堆那样指定最大占用内存,设置Xss指点单个线程的占用内存大小,默认线程占用空间时1M,可以设置为512k。

服务器中影响最多线程数的因素: 1)内存,线程肯定是占用内存的,如果内存耗尽,那自然不能继续创建线程。 单个线程占用内存大小可通过-Xss设置,现在默认1M,一般建议512k就够了。 如果Xss设置过大,则浪费内存空间; 如果Xss设置过小,代码中有遍历或递归导致调用太深的时候,就有可能耗尽StackSpace,爆出StackOverflow的错误;

2)机器设置的最大线程数 操作系统会限制进程允许创建的线程数,使用ulimit -u命令查看限制。某些服务器上此阈值设置的过小,比如1024。

最新回复(0)