JMeter 源码解读 [7] - JMeter 线程模型

JMeter 的线程管理主要是通过两个类来进行,一个是ThreadGroup 顾名思义整个是一个线程的集合,另外一个是JMeterThread 整个可以理解为是一个工作线程,用来做实际请求发送的工作。上一次分析StandardJMeterEnginet提到它最后实际是构造了一个ThreadGroup的对象,然后通过调用startThreadGroup方法来启动线程组,实际内部就调用ThreadGroup自身的start方法,那么我们来看一下这个方法的核心在做什么

ThreadGroup.start

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
delayedStartup = isDelayedStartup(); // Fetch once; needs to stay constant
log.info("Starting thread group... number={} threads={} ramp-up={} perThread={} delayedStart={}", groupNumber,
numThreads, rampUpPeriodInSeconds, perThreadDelayInMillis, delayedStartup);
if (delayedStartup) {
threadStarter = new Thread(new ThreadStarter(notifier, threadGroupTree, engine), getName()+"-ThreadStarter");
threadStarter.setDaemon(true);
threadStarter.start();
// N.B. we don't wait for the thread to complete, as that would prevent parallel TGs
} else {
long now = System.currentTimeMillis(); // needs to be same time for all threads in the group
final JMeterContext context = JMeterContextService.getContext();
for (int threadNum = 0; running && threadNum < numThreads; threadNum++) {
startNewThread(notifier, threadGroupTree, engine, threadNum, context, now, (int)(threadNum * perThreadDelayInMillis));
}
}
  1. 首先是获取一个delayedStartup 的状态,这个表示的启动线程组的方式,是否是立即启动全部,还是说通过一个period 的周期来启动整个线程组
  2. 如果delayedStartup是true 的话,那么就会先启动过一个叫ThreadStarter 的线程,通过它来启动过线程组
  3. 反之就直接通过循环调用startNewThread 这个函数的方式来启动线程组

###看一下 ThreadStarter 线程是怎么启动线程组的

我们知道JMeter在启动线程组的时候是有设置Ramp up period的模式,整个就是通过ThreadStarter线程来实现的,我么简单看一下它的run() 函数是咋么工作的

1
2
3
4
5
6
7
8
9
10
final int perThreadDelayInMillis = Math.round((float) (getRampUp() * 1000) / (float) numThreads);
for (int threadNumber = 0; running && threadNumber < numThreads; threadNumber++) {
if (threadNumber > 0) {
pause(perThreadDelayInMillis); // ramp-up delay (except first)
}
if (usingScheduler && System.currentTimeMillis() > endtime) {
break; // no point continuing beyond the end time
}
JMeterThread jmThread = makeThread(notifier, threadGroupTree, engine, threadNumber, context);
jmThread.setInitialDelay(0); // Already waited
  1. 首先计算一下每个线程启动的时间等待间隔,final int perThreadDelayInMillis = Math.round((float) (getRampUp() * 1000) / (float) numThreads);
  2. 然后就是每次启动一个线程的时候,等待一下 pause(perThreadDelayInMillis)

我们再看一下startNewThread 是怎么启动线程的

1
2
3
4
5
6
7
8
9
10
private JMeterThread startNewThread(ListenerNotifier notifier, ListedHashTree threadGroupTree, StandardJMeterEngine engine,
int threadNum, final JMeterContext context, long now, int delay) {
JMeterThread jmThread = makeThread(notifier, threadGroupTree, engine, threadNum, context);
scheduleThread(jmThread, now); // set start and end time
jmThread.setInitialDelay(delay);
Thread newThread = new Thread(jmThread, jmThread.getThreadName());
registerStartedThread(jmThread, newThread);
newThread.start();
return jmThread;
}

非常直观,创建一个JMeterThread 对象,做一些设置,然后放到一个Thread 里面跑就好了,registerStartedThread(jmThread, newThread); 这个调用是在ThreadGroup的一个Map 里把JMeterThread和真实的Thread关系存一下,方便后面的查找

JMeterThread

最后我们来看一下核心的JMeterThread 工作线程是怎么work的。首先我们要找一个答案,它怎么处理‘Loop Count / Forever’ 的运行模式的

我之前在设计依图自身的性能测试框架的时候对快问题采用的设计方式是通过producer / consumer的方式把系统解耦,通过一个 Queue来出来,所以对于JMeter的处理方式很好奇。看了代码发现 JMeter的实现非常简答,我们看一下run 函数头部的几个循环判断条件就能找到答案了

1
2
3
4
5
6
7
8
9
10
11
12
public void run() {
// threadContext is not thread-safe, so keep within thread
JMeterContext threadContext = JMeterContextService.getContext();
LoopIterationListener iterationListener = null;

try {
iterationListener = initRun(threadContext);
while (running) {
Sampler sam = threadGroupLoopController.next();
while (running && sam != null) {
processSampler(sam, null, threadContext);
threadContext.cleanAfterSample();
  1. Sampler 表示一次请求,每次通过threadGroupLoopController.next() 这个迭代器来获取下一个Sampler对象,当 sampler 不为空的时候就继续发请求
  2. threadGroupLoopController 这个对象实际是一个Controller接口的具体实现, Controller 的实现有很多类型,我们可以在界面上选择,也可以在配置文件里面修改, 下面就是用了一个 LoopController
1
<elementProp name="ThreadGroup.main_controller" elementType="LoopController" guiclass="LoopControlPanel" testclass="LoopController" testname="Loop Controller" enabled="true">
  1. 当controller对象的isDone判定为真的时候,就把running状态设置成False,那么这个JMeterThread 也就结束生命了
1
2
3
4
if (threadGroupLoopController.isDone()) {
running = false;
log.info("Thread is done: {}", threadName);
}

线程设计模型的更多思考

参考之前Gatling的使用来看,它对于并发模型本身的支持粒度非常细,例如支持恒定并发,恒定 QPS 等不同并发场景的设计

我在自己设计 Atom Integraion的时候也希望这个框架能在并发模型支持的灵获性上做的更好,通过引入Queue 把 Scheduler 和 Load Generator 分离,我就可以通过写入 Queue 的速度来控制最终 Load Generator 的并发模型

从以前使用 JMeter的经验来看,它对于并发模型的支持是比较有限的,从它目前的设计来看,整个系统的解耦做的是不太好的,首先每个JMeterThread的工作是独立的 , 虽然它们是共享同一个threadGroupLoopController对象,但是通过一个迭代器来控制并发的粒度,实在是有点难,不知道未来JMeter 开发团队对于并发模型支持这块会不会有更多的计划 。 这里只是我的一些愚见, 也欢迎大家留言和我讨论。