Java连接Linux执行OS命令

it2023-05-22  65

Java连接Linux执行OS命令

导入maven依赖获取连接&执行命令进一步封装创建Result.java创建OsCmdTask.java创建OsCmd.java 总结拓展问题

导入maven依赖

<!-- https://mvnrepository.com/artifact/ch.ethz.ganymed/ganymed-ssh2 --> <dependency> <groupId>ch.ethz.ganymed</groupId> <artifactId>ganymed-ssh2</artifactId> <version>262</version> </dependency>

获取连接&执行命令

// 获取连接 Connection connection = new Connection(host); ConnectionInfo connect = connection.connect(); // 身份认证 connection.authenticateWithPassword(userName, userPassword); // 获取session执行命令 Session session = connection.openSession(); session.execCommand(command); //执行结果存放在session中 // 成功执行结果:session.getStdout() // 错误执行结果: session.getStderr()

进一步封装

创建Result.java

import com.google.gson.Gson; import lombok.extern.slf4j.Slf4j; import java.util.ArrayList; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.FutureTask; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; /** * 执行OS命令. */ @Slf4j public class OsCmd { public static void main(String[] args) { String hostname = "127.0.0.1"; String username = "root"; String password = "root"; List<Result> result = new ArrayList<>(); result.add(OsCmd.sendByFutureTask(hostname, username, password, "echo 1")); result.add(OsCmd.sendByFutureTask(hostname, username, password, "echo 2")); result.add(OsCmd.sendByFutureTask(hostname, username, password, "ping baidu.com")); result.add(OsCmd.sendByFutureTask(hostname, username, password, "echo 3")); result.add(OsCmd.sendByFutureTask(hostname, username, password, "echo 4")); System.out.println(new Gson().toJson(result)); } /** * 执行OS命令 */ public static Result sendByFutureTask(String host, String userName, String password, String command) { return OsCmd.sendByFutureTask(host, userName, password, command, 1000, TimeUnit.MILLISECONDS, Boolean.TRUE); } /** * 执行OS命令 */ public static Result sendByFutureTask(String host, String userName, String password, String command, long time, TimeUnit unit, boolean flag) { ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 5, 100, TimeUnit.SECONDS, new LinkedBlockingQueue<>(1000)); FutureTask<Result> future = new FutureTask<>(new Callable<Result>() { @Override public Result call() throws Exception { return new OsCmdTask().send(host, userName, password, command); } }); threadPool.execute(future); Result result = null; try { result = future.get(time, unit); } catch (Exception e) { // true执行失败后不影响其它任务 future.cancel(flag); log.error("OS命令执行失败", e); } return result; } }

创建OsCmdTask.java

import ch.ethz.ssh2.Connection; import ch.ethz.ssh2.ConnectionInfo; import ch.ethz.ssh2.Session; import ch.ethz.ssh2.StreamGobbler; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; /** * OS命令执行实现类 */ @Slf4j public class OsCmdTask { public Result send(String host, String userName, String userPassword, String command) throws IOException { log.info("===开始执行OS命令==="); Result result; Connection connection = null; Session session = null; ConnectionInfo connect = null; try { // 获取连接 connection = new Connection(host); connect = connection.connect(); log.info("开始连接远程主机:{}", host); // 身份认证 boolean isAuthenticated = connection.authenticateWithPassword(userName, userPassword); if (isAuthenticated == false) { log.error("连接远程主机:{} 失败,原因:权限认证失败", host); throw new IOException("权限认证失败"); } // 获取session执行命令 session = connection.openSession(); session.execCommand(command); log.info("连接成功,开始执行OS命令:{}", command); // 收集执行结果 result = dealWithMsg(session); } finally { // 连接异常log if (connect == null) { log.error("远程主机:{} 连接失败,请检查主机地址", host); } //关闭Session if (session != null) { session.close(); } //关闭Connection if (connection != null) { connection.close(); } log.info("===执行OS命令结束==="); } return result; } private boolean checkMsg(StringBuffer buff) { if (buff.length() > Integer.MAX_VALUE) { return true; } return false; } private Result dealWithMsg(Session session) throws IOException { Result result = new Result(); InputStream stdout = new StreamGobbler(session.getStdout()); InputStream stderr = new StreamGobbler(session.getStderr()); BufferedReader stdoutReader = new BufferedReader(new InputStreamReader(stdout)); BufferedReader stderrReader = new BufferedReader(new InputStreamReader(stderr)); StringBuffer stdoutLineBuff = new StringBuffer(); while (true) { String line = stdoutReader.readLine(); if (line == null || checkMsg(stdoutLineBuff)) { break; } stdoutLineBuff.append(line); } result.setOutputMessage(stdoutLineBuff.toString()); StringBuffer stderrLineBuff = new StringBuffer(); while (true) { String line = stderrReader.readLine(); if (line == null || checkMsg(stderrLineBuff)) { break; } stderrLineBuff.append(line); } result.setErrorMessage(stderrLineBuff.toString()); boolean isSuccess = StringUtils.isBlank(result.getErrorMessage()); if (isSuccess) { log.info("OS命令执行成功,返回信息:{}",result.getOutputMessage()); } else { log.info("OS命令执行失败,返回信息:{}",result.getErrorMessage()); } result.setSuccess(isSuccess); return result; } }

创建OsCmd.java

import com.google.gson.Gson; import lombok.extern.slf4j.Slf4j; import java.util.ArrayList; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.FutureTask; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; /** * 执行OS命令. */ @Slf4j public class OsCmd { public static void main(String[] args) { String hostname = "127.0.0.1"; String username = "root"; String password = "root"; List<Result> result = new ArrayList<>(); result.add(OsCmd.sendByFutureTask(hostname, username, password, "echo 1")); result.add(OsCmd.sendByFutureTask(hostname, username, password, "echo 2")); result.add(OsCmd.sendByFutureTask(hostname, username, password, "ping baidu.com")); result.add(OsCmd.sendByFutureTask(hostname, username, password, "echo 3")); result.add(OsCmd.sendByFutureTask(hostname, username, password, "echo 4")); System.out.println(new Gson().toJson(result)); // 实际工程中不要关闭线程池 threadPool.shutdown(); } public static ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 5, 100, TimeUnit.SECONDS, new LinkedBlockingQueue<>(1000)); /** * 执行OS命令 */ public static Result sendByFutureTask(String host, String userName, String password, String command) { return OsCmd.sendByFutureTask(host, userName, password, command, 1000, TimeUnit.MILLISECONDS, Boolean.TRUE); } /** * 执行OS命令 */ public static Result sendByFutureTask(String host, String userName, String password, String command, long time, TimeUnit unit, boolean flag) { FutureTask<Result> future = new FutureTask<>(new Callable<Result>() { @Override public Result call() throws Exception { return new OsCmdTask().send(host, userName, password, command); } }); threadPool.execute(future); Result result = null; try { result = future.get(time, unit); } catch (Exception e) { // true执行失败后不影响其它任务 future.cancel(flag); log.error("OS命令执行失败", e); } return result; } }

总结

将FutureTask交给线程池执行,将可能产生耗时操作的请求用线程池处理,用户线程调用get(long timeout, TimeUnit unit)方法获取执行结果。这样做表面上看起来只是将用户线程的任务转移到了线程池,而且还可能创建创建新线程,看起来并没实际的作用!真的是这样吗? 查看相关源码:

FutureTask中get(long timeout, TimeUnit unit)方法源码: /** * @throws CancellationException {@inheritDoc} */ public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { if (unit == null) throw new NullPointerException(); int s = state; if (s <= COMPLETING && (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING) throw new TimeoutException(); return report(s); }

FutureTask中get(long timeout, TimeUnit unit)方法的核心是调用awaitDone(boolean timed, long nanos)方法

/** * Awaits completion or aborts on interrupt or timeout. * * @param timed true if use timed waits * @param nanos time to wait, if timed * @return state upon completion */ private int awaitDone(boolean timed, long nanos) throws InterruptedException { final long deadline = timed ? System.nanoTime() + nanos : 0L; WaitNode q = null; boolean queued = false; for (;;) { if (Thread.interrupted()) { removeWaiter(q); throw new InterruptedException(); } int s = state; if (s > COMPLETING) { if (q != null) q.thread = null; return s; } else if (s == COMPLETING) // cannot time out yet // 超时时间未到,调用线程的yield方法,让出占用的CPU资源 Thread.yield(); else if (q == null) q = new WaitNode(); else if (!queued) queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); else if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { removeWaiter(q); return state; } LockSupport.parkNanos(this, nanos); } else LockSupport.park(this); } }

用户线程被阻塞,让出CPU资源,线程池中的现在执行任务,这样,当同一时刻出现大量的耗时请求时,这些请求会进入线程池的阻塞队列(也可以是无界队列),系统的压力转移到了线程池,只有线程池的线程一直处理这些请求(只是一些请求多了一些等待时间),而系统本身依然是可用状态。 在JVM中的线程都是抢占式调度,在CPU的一个时间片内处理的线程数量有限(CPU时间片轮转),如果使用用户线程去执行,系统内会产生大量的用户线程,这些用户线程短时间内得不到执行,越积累越多,那么上述情况可能短时间内将系统资源耗尽,使系统处于不可用的状态。 当然如果这类任务不会出现上述的短时间内大量请求的场景,那么这种现象也是不会存在的。

拓展问题

FutureTask的类依赖如下:

public class FutureTask<V> implements RunnableFuture<V> { public interface RunnableFuture<V> extends Runnable, Future<V> {

本质上还是使用了java.util.concurrent包下的Future接口,因此可以使用更原始的写法:OsCmdTask 实现 Callable接口,重写call()方法,在OsCmd中使用:

Future<Result> result = threadPool.submit(osCmdTask); try { Result targetResult = result .get(time, unit); } ……

这样虽然也正常执行,但是查看log发现:在线程任务未执行前会抛出TimeOutException,该问题尚未解决。

最新回复(0)