Java连接Linux执行OS命令
导入maven依赖获取连接&执行命令进一步封装创建Result.java创建OsCmdTask.java创建OsCmd.java
总结拓展问题
导入maven依赖
<!-- https://mvnrepository.com/artifact/ch.ethz.ganymed/ganymed-ssh2 -->
<dependency>
<groupId>ch.ethz.ganymed</groupId>
<artifactId>ganymed-ssh2</artifactId>
<version>262</version>
</dependency>
获取连接&执行命令
// 获取连接
Connection connection = new Connection(host);
ConnectionInfo connect = connection.connect();
// 身份认证
connection.authenticateWithPassword(userName, userPassword);
// 获取session执行命令
Session session = connection.openSession();
session.execCommand(command);
//执行结果存放在session中
// 成功执行结果:session.getStdout()
// 错误执行结果: session.getStderr()
进一步封装
创建Result.java
import com.google.gson.Gson;
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* 执行OS命令.
*/
@Slf4j
public class OsCmd {
public static void main(String[] args) {
String hostname = "127.0.0.1";
String username = "root";
String password = "root";
List<Result> result = new ArrayList<>();
result.add(OsCmd.sendByFutureTask(hostname, username, password,
"echo 1"));
result.add(OsCmd.sendByFutureTask(hostname, username, password,
"echo 2"));
result.add(OsCmd.sendByFutureTask(hostname, username, password,
"ping baidu.com"));
result.add(OsCmd.sendByFutureTask(hostname, username, password,
"echo 3"));
result.add(OsCmd.sendByFutureTask(hostname, username, password,
"echo 4"));
System.out.println(new Gson().toJson(result));
}
/**
* 执行OS命令
*/
public static Result sendByFutureTask(String host, String userName, String password, String command) {
return OsCmd.sendByFutureTask(host, userName, password, command,
1000, TimeUnit.MILLISECONDS, Boolean.TRUE);
}
/**
* 执行OS命令
*/
public static Result sendByFutureTask(String host, String userName, String password, String command,
long time, TimeUnit unit, boolean flag) {
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1,
5, 100, TimeUnit.SECONDS, new LinkedBlockingQueue<>(1000));
FutureTask<Result> future =
new FutureTask<>(new Callable<Result>() {
@Override
public Result call() throws Exception {
return new OsCmdTask().send(host, userName, password, command);
}
});
threadPool.execute(future);
Result result = null;
try {
result = future.get(time, unit);
} catch (Exception e) {
// true执行失败后不影响其它任务
future.cancel(flag);
log.error("OS命令执行失败", e);
}
return result;
}
}
创建OsCmdTask.java
import ch.ethz.ssh2.Connection;
import ch.ethz.ssh2.ConnectionInfo;
import ch.ethz.ssh2.Session;
import ch.ethz.ssh2.StreamGobbler;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
/**
* OS命令执行实现类
*/
@Slf4j
public class OsCmdTask {
public Result send(String host, String userName, String userPassword, String command) throws IOException {
log.info("===开始执行OS命令===");
Result result;
Connection connection = null;
Session session = null;
ConnectionInfo connect = null;
try {
// 获取连接
connection = new Connection(host);
connect = connection.connect();
log.info("开始连接远程主机:{}", host);
// 身份认证
boolean isAuthenticated = connection.authenticateWithPassword(userName, userPassword);
if (isAuthenticated == false) {
log.error("连接远程主机:{} 失败,原因:权限认证失败", host);
throw new IOException("权限认证失败");
}
// 获取session执行命令
session = connection.openSession();
session.execCommand(command);
log.info("连接成功,开始执行OS命令:{}", command);
// 收集执行结果
result = dealWithMsg(session);
} finally {
// 连接异常log
if (connect == null) {
log.error("远程主机:{} 连接失败,请检查主机地址", host);
}
//关闭Session
if (session != null) {
session.close();
}
//关闭Connection
if (connection != null) {
connection.close();
}
log.info("===执行OS命令结束===");
}
return result;
}
private boolean checkMsg(StringBuffer buff) {
if (buff.length() > Integer.MAX_VALUE) {
return true;
}
return false;
}
private Result dealWithMsg(Session session) throws IOException {
Result result = new Result();
InputStream stdout = new StreamGobbler(session.getStdout());
InputStream stderr = new StreamGobbler(session.getStderr());
BufferedReader stdoutReader = new BufferedReader(new InputStreamReader(stdout));
BufferedReader stderrReader = new BufferedReader(new InputStreamReader(stderr));
StringBuffer stdoutLineBuff = new StringBuffer();
while (true) {
String line = stdoutReader.readLine();
if (line == null || checkMsg(stdoutLineBuff)) {
break;
}
stdoutLineBuff.append(line);
}
result.setOutputMessage(stdoutLineBuff.toString());
StringBuffer stderrLineBuff = new StringBuffer();
while (true) {
String line = stderrReader.readLine();
if (line == null || checkMsg(stderrLineBuff)) {
break;
}
stderrLineBuff.append(line);
}
result.setErrorMessage(stderrLineBuff.toString());
boolean isSuccess = StringUtils.isBlank(result.getErrorMessage());
if (isSuccess) {
log.info("OS命令执行成功,返回信息:{}",result.getOutputMessage());
} else {
log.info("OS命令执行失败,返回信息:{}",result.getErrorMessage());
}
result.setSuccess(isSuccess);
return result;
}
}
创建OsCmd.java
import com.google.gson.Gson;
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* 执行OS命令.
*/
@Slf4j
public class OsCmd {
public static void main(String[] args) {
String hostname = "127.0.0.1";
String username = "root";
String password = "root";
List<Result> result = new ArrayList<>();
result.add(OsCmd.sendByFutureTask(hostname, username, password,
"echo 1"));
result.add(OsCmd.sendByFutureTask(hostname, username, password,
"echo 2"));
result.add(OsCmd.sendByFutureTask(hostname, username, password,
"ping baidu.com"));
result.add(OsCmd.sendByFutureTask(hostname, username, password,
"echo 3"));
result.add(OsCmd.sendByFutureTask(hostname, username, password,
"echo 4"));
System.out.println(new Gson().toJson(result));
// 实际工程中不要关闭线程池
threadPool.shutdown();
}
public static ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1,
5, 100, TimeUnit.SECONDS, new LinkedBlockingQueue<>(1000));
/**
* 执行OS命令
*/
public static Result sendByFutureTask(String host, String userName, String password, String command) {
return OsCmd.sendByFutureTask(host, userName, password, command,
1000, TimeUnit.MILLISECONDS, Boolean.TRUE);
}
/**
* 执行OS命令
*/
public static Result sendByFutureTask(String host, String userName, String password, String command,
long time, TimeUnit unit, boolean flag) {
FutureTask<Result> future =
new FutureTask<>(new Callable<Result>() {
@Override
public Result call() throws Exception {
return new OsCmdTask().send(host, userName, password, command);
}
});
threadPool.execute(future);
Result result = null;
try {
result = future.get(time, unit);
} catch (Exception e) {
// true执行失败后不影响其它任务
future.cancel(flag);
log.error("OS命令执行失败", e);
}
return result;
}
}
总结
将FutureTask交给线程池执行,将可能产生耗时操作的请求用线程池处理,用户线程调用get(long timeout, TimeUnit unit)方法获取执行结果。这样做表面上看起来只是将用户线程的任务转移到了线程池,而且还可能创建创建新线程,看起来并没实际的作用!真的是这样吗? 查看相关源码:
FutureTask中get(long timeout, TimeUnit unit)方法源码:
/**
* @throws CancellationException {@inheritDoc}
*/
public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
if (unit == null)
throw new NullPointerException();
int s = state;
if (s <= COMPLETING &&
(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
throw new TimeoutException();
return report(s);
}
FutureTask中get(long timeout, TimeUnit unit)方法的核心是调用awaitDone(boolean timed, long nanos)方法
/**
* Awaits completion or aborts on interrupt or timeout.
*
* @param timed true if use timed waits
* @param nanos time to wait, if timed
* @return state upon completion
*/
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
int s = state;
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
else if (s == COMPLETING) // cannot time out yet
// 超时时间未到,调用线程的yield方法,让出占用的CPU资源
Thread.yield();
else if (q == null)
q = new WaitNode();
else if (!queued)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
}
else
LockSupport.park(this);
}
}
用户线程被阻塞,让出CPU资源,线程池中的现在执行任务,这样,当同一时刻出现大量的耗时请求时,这些请求会进入线程池的阻塞队列(也可以是无界队列),系统的压力转移到了线程池,只有线程池的线程一直处理这些请求(只是一些请求多了一些等待时间),而系统本身依然是可用状态。 在JVM中的线程都是抢占式调度,在CPU的一个时间片内处理的线程数量有限(CPU时间片轮转),如果使用用户线程去执行,系统内会产生大量的用户线程,这些用户线程短时间内得不到执行,越积累越多,那么上述情况可能短时间内将系统资源耗尽,使系统处于不可用的状态。 当然如果这类任务不会出现上述的短时间内大量请求的场景,那么这种现象也是不会存在的。
拓展问题
FutureTask的类依赖如下:
public class FutureTask<V> implements RunnableFuture<V> {
public interface RunnableFuture<V> extends Runnable, Future<V> {
本质上还是使用了java.util.concurrent包下的Future接口,因此可以使用更原始的写法:OsCmdTask 实现 Callable接口,重写call()方法,在OsCmd中使用:
Future<Result> result = threadPool.submit(osCmdTask);
try {
Result targetResult = result .get(time, unit);
} ……
这样虽然也正常执行,但是查看log发现:在线程任务未执行前会抛出TimeOutException,该问题尚未解决。