JMeter 源码解读 [10] - 分布式核心类

上次我们已经分析了一下JMeter的分布式架构,今天我们来看一下其中相关的核心类是怎么工作的,我们还是先从Master 开始,Master 是用来接收用户控制的JMeter Engine, 它接收 GUI 和 NoneGUI 两种运行模式,我们先来看一下 JMete.java 里面是怎么处理分布式模式的, 看一下start 函数是怎么工作的

else if (parser.getArgumentById(SERVER_OPT) != null) {
// Start the server
try {
RemoteJMeterEngineImpl.startServer(RmiUtils.getRmiRegistryPort()); // $NON-NLS-1$
startOptionalServers();
} catch (Exception ex) {
System.err.println("Server failed to start: "+ex);//NOSONAR
log.error("Giving up, as server failed with:", ex);
throw ex;
}

当命令行包含一个-s参数的时候,表示我们要启动一个Slave Server , 实际启动的工作是交给RemoteJMeterEngineImpl

###RemoteJMeterEngineImpl
RemoteJMeterEngineImpl 这个类实现了RemoteJMeterEngine接口,而RemoteJMeterEngine接口继承自 Remote 接口,我们知道在 RMI 通信框架里提供的 Remote 接口,就是用来声明继承这个接口的interface 定义的方法是用来远程调用的,RemoteJMeterEngine 定义的接口和JMeterEngine定义的一样,唯一的区别是每个方法前多了一个r,例如在 JMeterEngine 里定义的叫 configure的方法在RemoteJMeterEngine里定义的是rconfigure

public interface RemoteJMeterEngine extends Remote {
void rconfigure(HashTree testTree, String host, File jmxBase, String scriptName) throws RemoteException;

void rrunTest() throws RemoteException, JMeterEngineException;

void rstopTest(boolean now) throws RemoteException;

void rreset() throws RemoteException;

void rsetProperties(HashMap<String,String> p) throws RemoteException;

void rexit() throws RemoteException;
}

我们在来看 RemoteJMeterEngineImpl 的实现,首先在上次的分析中谈过,真正的执行逻辑是通过在内部自带的一个 JMeterEngine 来实现,可以理解为一个 JMeter Slave 就是一个不提供外部交互interface, 只允许通过 RMI 协议来访问的特殊 JMeter, 它自身的处理逻辑就是一个标准的 JMeterEngine, 我们简单看一个 RemoteJMeterEngine 的接口实现,我们看一下rexit的实现

@Override
public void rexit() throws RemoteException {
log.info("Exiting");
// Bug 59400 - allow rexit() to return
Thread et = new Thread(() -> {
log.info("Stopping the backing engine");
backingEngine.exit();
});
et.setDaemon(false);
// Tidy up any objects we created
Registry reg = LocateRegistry.getRegistry(
RmiUtils.getRmiHost().getHostName(),
this.rmiRegistryPort,
RmiUtils.createClientSocketFactory());
try {
reg.unbind(JMETER_ENGINE_RMI_NAME);
} catch (NotBoundException e) {
log.warn("{} is not bound", JMETER_ENGINE_RMI_NAME, e);
}
log.info("Unbound from registry");
// Help with garbage control
JMeterUtils.helpGC();
et.start();
}

它的处理逻辑分几步

  1. 启动一个线程,调用 StandardJMeterEngine 自身的方法backingEngine.exit();来关闭 JMeter , 然后把自己从 RMI Service 中注销掉 reg.unbind(JMETER_ENGINE_RMI_NAME);

###DistributedRunner
刚刚谈了 Slave 的运行逻辑,再来看看 Master JMeter 怎么工作的,Master 在整个 RMI 通信框架中实际是一个 Client, 我们先还是从用户入口代码来看,看一下 JMeter 的 runNonGui 函数,在最下面有这么几行代码

java.util.StringTokenizer st = new java.util.StringTokenizer(remoteHostsString, ",");//$NON-NLS-1$
List<String> hosts = new LinkedList<>();
while (st.hasMoreElements()) {
hosts.add((String) st.nextElement());
}

DistributedRunner distributedRunner=new DistributedRunner(this.remoteProps);
distributedRunner.setStdout(System.out); // NOSONAR
distributedRunner.setStdErr(System.err); // NOSONAR
distributedRunner.init(hosts, clonedTree);
engines.addAll(distributedRunner.getEngines());
distributedRunner.start();

这里创建了一个 DistributeRunner ,看起来是一个 远程调用的 stub ,先看一下 distributedRunner.init 在干什么

while (idx < addrs.size()) {
String address = addrs.get(idx);
println("Configuring remote engine: " + address);
JMeterEngine engine = getClientEngine(address.trim(), tree);
if (engine != null) {
engines.put(address, engine);
addrs.remove(address);
} else {
println("Failed to configure " + address);
idx++;
}
}

它接收输入的多组Slave 的IP 和 Port ,然后会创建对应的(1:1)的 ClientJMeterEngine,并保存在 engines 这个容器里,等实际runTest的时候,是通过内部调用其自身的 start(List<String> addresses)方法,遍历每个ClientJMeterEngine 并通过RMI Call来调用Slave 来运行测试
public void start(List<String> addresses) {
println("Starting remote engines");
long now = System.currentTimeMillis();
println("Starting the test @ " + new Date(now) + " (" + now + ")");
for (String address : addresses) {
try {
if (engines.containsKey(address)) {
engines.get(address).runTest();
} else {
log.warn(HOST_NOT_FOUND_MESSAGE, address);
}
} catch (IllegalStateException | JMeterEngineException e) { // NOSONAR already reported to user
JMeterUtils.reportErrorToUser(e.getMessage(), JMeterUtils.getResString("remote_error_starting")); // $NON-NLS-1$
}
}
println("Remote engines have been started");
}

###ClientJMeterEngine
最后我们来看一下ClientJMeterEngine怎么工作的,它是一个JMeterEngine接口的实现,但是内部是一个RMI Client。首先是在构造函数的时候来创建一个RMI Client

Registry registry = LocateRegistry.getRegistry(
host,
port,
RmiUtils.createClientSocketFactory());
Remote remobj = registry.lookup(name);
if (remobj instanceof RemoteJMeterEngine){
final RemoteJMeterEngine rje = (RemoteJMeterEngine) remobj;
if (remobj instanceof RemoteObject){
RemoteObject robj = (RemoteObject) remobj;
System.out.println("Using remote object: "+robj.getRef().remoteToString()); // NOSONAR
}
return rje;
}

然后我们再看一下它怎么实现JMeterEngine中定义的方法就一目了然了,我们看一个stopTest()方法,其实就是通过 RMI Client 来调用远程的方法
@Override
public void stopTest(boolean now) {
log.info("About to {} remote test on {}", now ? "stop" : "shutdown", hostAndPort);
try {
remote.rstopTest(now);
} catch (Exception ex) {
log.error("", ex); // $NON-NLS-1$
}
}

分布式的内容差不多就介绍到这里

Author: 妞爸爸
Link: http://markshao.github.io/2019/03/12/distributed-code-analysis/
Copyright Notice: All articles in this blog are licensed under CC BY-NC-SA 4.0 unless stating additionally.